异常再现:
从mysql取出数据,然后大量的聚合和join计算。大量的下面代码
var dsTmp=ds2.where("RANK <= 10").select("ID") ds1=ds1.join( ds1.join(dsTmp,"ID").groupBy("C_ID","S_ID").agg(round(avg("SCORE"),2).as("SCORE_AVG")), Seq("C_ID","S_ID") )
然后直接写到hive表。
从yarn到处日志信息
异常信息:
stderr
Exception in thread "spark-listener-group-eventLog" java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOfRange(Arrays.java:3664) at java.lang.String.<init>(String.java:207) at java.lang.StringBuilder.toString(StringBuilder.java:407) at com.fasterxml.jackson.core.util.TextBuffer.contentsAsString(TextBuffer.java:356) at com.fasterxml.jackson.core.io.SegmentedStringWriter.getAndClear(SegmentedStringWriter.java:83) at com.fasterxml.jackson.databind.ObjectMapper.writeValueAsString(ObjectMapper.java:2933) at org.apache.spark.util.JsonProtocol$.sparkEventToJson(JsonProtocol.scala:103) at org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:141) at org.apache.spark.scheduler.EventLoggingListener.onOtherEvent(EventLoggingListener.scala:233) at org.apache.spark.scheduler.SparkListenerBus$class.doPostEvent(SparkListenerBus.scala:76) at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37) at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37) at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:82) at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$super$postToAll(AsyncEventQueue.scala:89) at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:89) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:83) at org.apache.spark.scheduler.AsyncEventQueue$$anon$1$$anonfun$run$1.apply$mcV$sp(AsyncEventQueue.scala:79) at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1319) at org.apache.spark.scheduler.AsyncEventQueue$$anon$1.run(AsyncEventQueue.scala:78)
stdout
2019-01-30 10:09:57 ERROR FileFormatWriter:91 - Aborting job null. org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree: Exchange hashpartitioning(SUBJECT_ID#47, 200) +- *(3400) Project [CLASS_ID#51, SUBJECT_ID#47, SCHOOL_ID#53, STUDENT_ID#49, ID#44L, PROJECT_ID#45, PROJECT_NAME#46, SUBJECT_NAME#48, STUDENT_NAME#50, CLASS_NAME#52, SCHOOL_NAME#54, OBJECTIVE_SCORE#55, SUBJECTIVE_SCORE#56, SUBJECT_SCORE#57, FULL_SCORE#58, TOTAL_SCORE#59, GRADE_NAME#60, UNION_TAG#61, START_SCHOOL_YEAR#62, GRADE#63, EXAM_START_DATE#64, SUBJECT_TYPE#65, SUBJECT_LEVEL#66, IS_EXCELLENT_SUBJECT#549, ... 16 more fields] +- *(3400) SortMergeJoin [CLASS_ID#51, SUBJECT_ID#47], [CLASS_ID#6636, SUBJECT_ID#6632], Inner :- *(1698) Sort [CLASS_ID#51 ASC NULLS FIRST, SUBJECT_ID#47 ASC NULLS FIRST], false, 0 ................. at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56) at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) ...................... Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree: Exchange hashpartitioning(CLASS_ID#6636, SUBJECT_ID#6632, 200) +- *(3398) HashAggregate(keys=[CLASS_ID#6636, SUBJECT_ID#6632], functions=[partial_avg(UnscaledValue(SUBJECT_SCORE#6642))], output=[CLASS_ID#6636, SUBJECT_ID#6632, sum#42766, count#42767L]) +- *(3398) Project [SUBJECT_ID#6632, CLASS_ID#6636, SUBJECT_SCORE#6642] +- *(3398) SortMergeJoin [STUDENT_ID#6634], [STUDENT_ID#7], Inner .............................. at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56) at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119) .................................. Caused by: java.lang.NullPointerException at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.needToCopyObjectsBeforeShuffle(ShuffleExchangeExec.scala:166) ......................
异常分析:
我数据量不大,为何还内存溢出,我去掉几个上面代码的计算,就时可以的,说明计算量太大了,而且spark时懒加载,延迟加载会导致我的代码在输出到hive表的时刻累计大量计算,这种计算定义的一些变量和子rdd等第,太多,也可能导致内存溢出。
后来google了一把,spark中这种大量join的笛卡儿积计算,会导致join的子rdd被广播到其他worker上,而广播变量的的内存是限定的。
所以我设置了一把 spark.sql.autoBroadcastJoinThreshold,无论设置多大,或者设置成-1都没用。最后还是各种试解决的。
异常解决:
在写表之前调用dataset persist
我不想在输出到hive表的时候进行最终的数据加载计算,我先计算了加载到内存再写入到hive表就没问题了
dataset.persist(StorageLevel.MEMORY_AND_DISK_SER_2) dataset.write.mode(SaveMode.Append).format("hive").saveAsTable(table)