解决spark大量多次join计算导致的内存溢出

异常再现:

从mysql取出数据,然后大量的聚合和join计算。大量的下面代码

var dsTmp=ds2.where("RANK <= 10").select("ID")
ds1=ds1.join(
  ds1.join(dsTmp,"ID").groupBy("C_ID","S_ID").agg(round(avg("SCORE"),2).as("SCORE_AVG")),
  Seq("C_ID","S_ID")
)

然后直接写到hive表。

从yarn到处日志信息


异常信息:

stderr

Exception in thread "spark-listener-group-eventLog" java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOfRange(Arrays.java:3664)
at java.lang.String.<init>(String.java:207)
at java.lang.StringBuilder.toString(StringBuilder.java:407)
at com.fasterxml.jackson.core.util.TextBuffer.contentsAsString(TextBuffer.java:356)
at com.fasterxml.jackson.core.io.SegmentedStringWriter.getAndClear(SegmentedStringWriter.java:83)
at com.fasterxml.jackson.databind.ObjectMapper.writeValueAsString(ObjectMapper.java:2933)
at org.apache.spark.util.JsonProtocol$.sparkEventToJson(JsonProtocol.scala:103)
at org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:141)
at org.apache.spark.scheduler.EventLoggingListener.onOtherEvent(EventLoggingListener.scala:233)
at org.apache.spark.scheduler.SparkListenerBus$class.doPostEvent(SparkListenerBus.scala:76)
at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:82)
at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$super$postToAll(AsyncEventQueue.scala:89)
at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:89)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:83)
at org.apache.spark.scheduler.AsyncEventQueue$$anon$1$$anonfun$run$1.apply$mcV$sp(AsyncEventQueue.scala:79)
at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1319)
at org.apache.spark.scheduler.AsyncEventQueue$$anon$1.run(AsyncEventQueue.scala:78)

stdout

2019-01-30 10:09:57 ERROR FileFormatWriter:91 - Aborting job null.
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
Exchange hashpartitioning(SUBJECT_ID#47, 200)
+- *(3400) Project [CLASS_ID#51, SUBJECT_ID#47, SCHOOL_ID#53, STUDENT_ID#49, ID#44L, PROJECT_ID#45, PROJECT_NAME#46, SUBJECT_NAME#48, STUDENT_NAME#50, CLASS_NAME#52, SCHOOL_NAME#54, OBJECTIVE_SCORE#55, SUBJECTIVE_SCORE#56, SUBJECT_SCORE#57, FULL_SCORE#58, TOTAL_SCORE#59, GRADE_NAME#60, UNION_TAG#61, START_SCHOOL_YEAR#62, GRADE#63, EXAM_START_DATE#64, SUBJECT_TYPE#65, SUBJECT_LEVEL#66, IS_EXCELLENT_SUBJECT#549, ... 16 more fields]
   +- *(3400) SortMergeJoin [CLASS_ID#51, SUBJECT_ID#47], [CLASS_ID#6636, SUBJECT_ID#6632], Inner
      :- *(1698) Sort [CLASS_ID#51 ASC NULLS FIRST, SUBJECT_ID#47 ASC NULLS FIRST], false, 0
.................
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
......................
Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
Exchange hashpartitioning(CLASS_ID#6636, SUBJECT_ID#6632, 200)
+- *(3398) HashAggregate(keys=[CLASS_ID#6636, SUBJECT_ID#6632], functions=[partial_avg(UnscaledValue(SUBJECT_SCORE#6642))], output=[CLASS_ID#6636, SUBJECT_ID#6632, sum#42766, count#42767L])
   +- *(3398) Project [SUBJECT_ID#6632, CLASS_ID#6636, SUBJECT_SCORE#6642]
      +- *(3398) SortMergeJoin [STUDENT_ID#6634], [STUDENT_ID#7], Inner
..............................
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119)
..................................
Caused by: java.lang.NullPointerException
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.needToCopyObjectsBeforeShuffle(ShuffleExchangeExec.scala:166)
......................


异常分析:

我数据量不大,为何还内存溢出,我去掉几个上面代码的计算,就时可以的,说明计算量太大了,而且spark时懒加载,延迟加载会导致我的代码在输出到hive表的时刻累计大量计算,这种计算定义的一些变量和子rdd等第,太多,也可能导致内存溢出。


后来google了一把,spark中这种大量join的笛卡儿积计算,会导致join的子rdd被广播到其他worker上,而广播变量的的内存是限定的。

所以我设置了一把 spark.sql.autoBroadcastJoinThreshold,无论设置多大,或者设置成-1都没用。最后还是各种试解决的。


异常解决:

在写表之前调用dataset persist

我不想在输出到hive表的时候进行最终的数据加载计算,我先计算了加载到内存再写入到hive表就没问题了

dataset.persist(StorageLevel.MEMORY_AND_DISK_SER_2)
dataset.write.mode(SaveMode.Append).format("hive").saveAsTable(table)




登录后即可回复 登录 | 注册
    
关注编程学问公众号