spark 异常 TimeoutException: Futures timed out after [1000 seconds]

spark | 2019-09-13 10:02:39

异常信息

我的spark计算很多任务,但总是有个任务报异常:Caused by: java.util.concurrent.TimeoutException: Futures timed out after [1000 seconds]


具体异常信息如下

org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
Exchange rangepartitioning(subject_score_model_SUBJECT_NAMEsort#2210 ASC NULLS FIRST, 200)
+- *(3) Project [subject_score_model_SUBJECT_NAME#2026, column747_223#2197, subject_score_model_START_SCHOOL_YEAR#2039, subject_score_model_SCHOOL_NAME#2032, subject_score_model_SUBJECT_LEVEL#2042, subject_score_model_EXAM_START_DATE#2041, subject_score_model_SUBJECT_NAMEsort#2210]
   +- *(3) BroadcastHashJoin [subject_score_model_SUBJECT_NAME#2026], [subject_score_model_SUBJECT_NAME#2209], LeftOuter, BuildRight
      :- *(3) Sort [subject_score_model_START_SCHOOL_YEAR#2039 ASC NULLS FIRST, subject_score_model_SCHOOL_NAME#2032 ASC NULLS FIRST, subject_score_model_SUBJECT_LEVEL#2042 ASC NULLS FIRST, subject_score_model_EXAM_START_DATE#2041 ASC NULLS FIRST, subject_score_model_SUBJECT_NAME#2026 ASC NULLS FIRST], true, 0
      :  +- Exchange rangepartitioning(subject_score_model_START_SCHOOL_YEAR#2039 ASC NULLS FIRST, subject_score_model_SCHOOL_NAME#2032 ASC NULLS FIRST, subject_score_model_SUBJECT_LEVEL#2042 ASC NULLS FIRST, subject_score_model_EXAM_START_DATE#2041 ASC NULLS FIRST, subject_score_model_SUBJECT_NAME#2026 ASC NULLS FIRST, 200)
      :     +- *(2) HashAggregate(keys=[subject_score_model_START_SCHOOL_YEAR#2039, subject_score_model_SCHOOL_NAME#2032, subject_score_model_SUBJECT_LEVEL#2042, subject_score_model_EXAM_START_DATE#2041, subject_score_model_SUBJECT_NAME#2026], functions=[avg(subject_score_model_SUBJECT_SCORE#2035)], output=[column747_223#2197, subject_score_model_START_SCHOOL_YEAR#2039, subject_score_model_SCHOOL_NAME#2032, subject_score_model_SUBJECT_LEVEL#2042, subject_score_model_EXAM_START_DATE#2041, subject_score_model_SUBJECT_NAME#2026])
      :        +- Exchange hashpartitioning(subject_score_model_START_SCHOOL_YEAR#2039, subject_score_model_SCHOOL_NAME#2032, subject_score_model_SUBJECT_LEVEL#2042, subject_score_model_EXAM_START_DATE#2041, subject_score_model_SUBJECT_NAME#2026, 200)
      :           +- *(1) HashAggregate(keys=[subject_score_model_START_SCHOOL_YEAR#2039, subject_score_model_SCHOOL_NAME#2032, subject_score_model_SUBJECT_LEVEL#2042, subject_score_model_EXAM_START_DATE#2041, subject_score_model_SUBJECT_NAME#2026], functions=[partial_avg(subject_score_model_SUBJECT_SCORE#2035)], output=[subject_score_model_START_SCHOOL_YEAR#2039, subject_score_model_SCHOOL_NAME#2032, subject_score_model_SUBJECT_LEVEL#2042, subject_score_model_EXAM_START_DATE#2041, subject_score_model_SUBJECT_NAME#2026, sum#2670, count#2671L])
      :              +- *(1) Project [SUBJECT_NAME#87 AS subject_score_model_SUBJECT_NAME#2026, SCHOOL_NAME#93 AS subject_score_model_SCHOOL_NAME#2032, SUBJECT_SCORE#96 AS subject_score_model_SUBJECT_SCORE#2035, START_SCHOOL_YEAR#100 AS subject_score_model_START_SCHOOL_YEAR#2039, EXAM_START_DATE#102 AS subject_score_model_EXAM_START_DATE#2041, SUBJECT_LEVEL#103 AS subject_score_model_SUBJECT_LEVEL#2042]
      :                 +- *(1) Filter (((((ALL_SUBJECT_LEVEL_TYPE#115 IN (其他全B以上,多A2B,多A1B) && SUBJECT_LEVEL#103 IN (A,B)) && ALL_SUBJECT_LEVEL_TYPE#115 IN (其他全B以上,多A2B,多A1B)) && SUBJECT_LEVEL#103 IN (A,B)) && ALL_SUBJECT_LEVEL_TYPE#115 IN (其他全B以上,多A2B,多A1B)) && SUBJECT_LEVEL#103 IN (A,B))
      :                    +- InMemoryTableScan [ALL_SUBJECT_LEVEL_TYPE#115, EXAM_START_DATE#102, SCHOOL_NAME#93, START_SCHOOL_YEAR#100, SUBJECT_LEVEL#103, SUBJECT_NAME#87, SUBJECT_SCORE#96], [ALL_SUBJECT_LEVEL_TYPE#115 IN (其他全B以上,多A2B,多A1B), SUBJECT_LEVEL#103 IN (A,B), ALL_SUBJECT_LEVEL_TYPE#115 IN (其他全B以上,多A2B,多A1B), SUBJECT_LEVEL#103 IN (A,B), ALL_SUBJECT_LEVEL_TYPE#115 IN (其他全B以上,多A2B,多A1B), SUBJECT_LEVEL#103 IN (A,B)]
      :                          +- InMemoryRelation [PROJECT_ID#83, ID#84L, PROJECT_NAME#85, SUBJECT_ID#86, SUBJECT_NAME#87, STUDENT_ID#88, STUDENT_NAME#89, CLASS_ID#90, CLASS_NAME#91, SCHOOL_ID#92, SCHOOL_NAME#93, OBJECTIVE_SCORE#94, SUBJECTIVE_SCORE#95, SUBJECT_SCORE#96, FULL_SCORE#97, TOTAL_SCORE#98, UNION_TAG#99, START_SCHOOL_YEAR#100, GRADE#101, EXAM_START_DATE#102, SUBJECT_LEVEL#103, SUBJECT_TYPE#104, FULL_SCORING_AVERAGE#105, TOTAL_SCORE_PROJECT_RANK#106, ... 59 more fields], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
      :                                +- Scan ExistingRDD[PROJECT_ID#83,ID#84L,PROJECT_NAME#85,SUBJECT_ID#86,SUBJECT_NAME#87,STUDENT_ID#88,STUDENT_NAME#89,CLASS_ID#90,CLASS_NAME#91,SCHOOL_ID#92,SCHOOL_NAME#93,OBJECTIVE_SCORE#94,SUBJECTIVE_SCORE#95,SUBJECT_SCORE#96,FULL_SCORE#97,TOTAL_SCORE#98,UNION_TAG#99,START_SCHOOL_YEAR#100,GRADE#101,EXAM_START_DATE#102,SUBJECT_LEVEL#103,SUBJECT_TYPE#104,FULL_SCORING_AVERAGE#105,TOTAL_SCORE_PROJECT_RANK#106,... 59 more fields]
      +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
         +- LocalTableScan [subject_score_model_SUBJECT_NAME#2209, subject_score_model_SUBJECT_NAMEsort#2210]
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:371)
at org.apache.spark.sql.execution.SortExec.inputRDDs(SortExec.scala:121)
at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.columnar.InMemoryRelation.buildBuffers(InMemoryRelation.scala:97)
at org.apache.spark.sql.execution.columnar.InMemoryRelation.<init>(InMemoryRelation.scala:92)
at org.apache.spark.sql.execution.columnar.InMemoryRelation$.apply(InMemoryRelation.scala:42)
at org.apache.spark.sql.execution.CacheManager$$anonfun$cacheQuery$1.apply(CacheManager.scala:97)
at org.apache.spark.sql.execution.CacheManager.writeLock(CacheManager.scala:67)
at org.apache.spark.sql.execution.CacheManager.cacheQuery(CacheManager.scala:91)
at org.apache.spark.sql.Dataset.persist(Dataset.scala:2902)
at com.report.tool.datacenter.engine.spark.action.HandlerSparkResult.saveResult(HandlerSparkResult.scala:197)
at com.report.tool.datacenter.engine.spark.SparkBatchEngineApp$$anonfun$main$2.apply(SparkBatchEngineApp.scala:143)
at com.report.tool.datacenter.engine.spark.SparkBatchEngineApp$$anonfun$main$2.apply(SparkBatchEngineApp.scala:105)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
at com.report.tool.datacenter.engine.spark.SparkBatchEngineApp$.main(SparkBatchEngineApp.scala:105)
at com.report.tool.datacenter.engine.spark.SparkBatchEngineApp.main(SparkBatchEngineApp.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$4.run(ApplicationMaster.scala:706)
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [1000 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:201)
at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:136)
at org.apache.spark.sql.execution.InputAdapter.doExecuteBroadcast(WholeStageCodegenExec.scala:367)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:144)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:140)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:140)
at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.prepareBroadcast(BroadcastHashJoinExec.scala:135)
at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.codegenOuter(BroadcastHashJoinExec.scala:280)
at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doConsume(BroadcastHashJoinExec.scala:103)
at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:181)
at org.apache.spark.sql.execution.SortExec.consume(SortExec.scala:37)
at org.apache.spark.sql.execution.SortExec.doProduce(SortExec.scala:176)
at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:88)
at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:83)
at org.apache.spark.sql.execution.SortExec.produce(SortExec.scala:37)
at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doProduce(BroadcastHashJoinExec.scala:97)
at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:88)
at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:83)
at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.produce(BroadcastHashJoinExec.scala:39)
at org.apache.spark.sql.execution.ProjectExec.doProduce(basicPhysicalOperators.scala:45)
at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:88)
at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:83)
at org.apache.spark.sql.execution.ProjectExec.produce(basicPhysicalOperators.scala:35)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doCodeGen(WholeStageCodegenExec.scala:524)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:576)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:92)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:128)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119)
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
... 40 more


从这一行 Caused by: java.util.concurrent.TimeoutException: Futures timed out after [1000 seconds]

向下看 有执行 boradcast,说明 有可能是大的结果 boradcast超时。


而这[1000 seconds]就是我在spark-defaults.conf 配置了:

spark.sql.broadcastTimeout 1000


解决方法

  1. 调用persist()把rdd缓存

  2. 继续调大spark.sql.broadcastTimeout


spark.sql.broadcastTimeout可以配置在spark-defaults.conf ,也可以在程序中配置:

SparkSession
  .builder
  .appName("Your App")
  .config("spark.sql.broadcastTimeout", "36000")
  .getOrCreate()



登录后即可回复 登录 | 注册
    
关注编程学问公众号