如何解决spark大量连续计算卡死的问题?

2019-10-10 16:48:54 | 编辑

问题描述:

spark程序数据量不大,可是计算很多,计算完的新字段都join到原来的dataset中,又反复依靠新的结果计算,反复join到原dataset,这就导致了又大量的join,rdd依赖变得异常复杂而且依赖也很长

spark程序计算代码如下

df=df.withColumn("IS_PASS_SUBJECT",when($"SUBJECT_SCORE"/$"FULL_SCORE" >=0.6,1).otherwise(0))
....大量的添加新列(上百个)

df=df.join(
  df.where("SCHOOL_RANK <= 10").groupBy("SCHOOL_ID","SUBJECT_ID").agg(round(avg("SUBJECT_SCORE"),2).as("SCHOOL_TOP10_SUBJECT_SCORE_AVG")),
  Seq("SCHOOL_ID","SUBJECT_ID"),"left"
)
....大量的计算新字段join到原来df

df=df.withColumn("SINGLE_SCORE_PROJECT_RANK", rank().over(Window.partitionBy($"PROJECT_ID",$"SUBJECT_ID").orderBy($"SUBJECT_SCORE".desc)))
...大量的根据前面计算的新列来计算新列

df=df.join(
  df.where("CLASS_RANK <= 10").groupBy("CLASS_ID","SUBJECT_ID").agg(round(avg("SUBJECT_SCORE"),2).as("CLASS_TOP10_SUBJECT_SCORE_AVG")),
  Seq("CLASS_ID","SUBJECT_ID"),"left"
)
....大量的根据前面计算的结果来计算新的结果然后join到原df

这样的话,spark程序就会在中间卡住,什么都不运行,也不报任何错误,也不重新划分job和stage,什么都卡住不动

我也知道如果没有action算子,spark会根据计算链条中的算子来划分stage,我的这个计算链条太长,我也不知道如何来解决卡死的问题?

登录后即可回复 登录 | 注册
    
  • admin
    admin

    解决代码
    dataset=dataset.persist()
    dataset=dataset.checkpoint()
    dataset.show()

  • admin
    admin

    问题解决,spark dataset提供checkpoint方法,注释介绍说是创建一个相当于快照的检查点输入到文件系统,来切断计算链血统,出现各种问题都将以此检查点来继续计算,以此来提高容错性。

    其实这个和我把dataset保存到hive表再取出来的解决方法一模一样,只不过checkpoint比较快,而且还有localCheckpoint可选择至于到底隔多少个迭代计算就执行checkpint,要根据自己的计算量和经验来判断,网上还比较推荐action算子和chache一起使用

  • admin
    admin

    我试过各种优化调整参数,增加内存和cpu,都不能直接解决问题,我越来越觉得spark就是有这个问题的,无法应对长链条的计算,spark是把一长串的依赖链条,保存下来了的,因为如果某个分区的数据损坏,spark是可以自动根据这个计算链条来恢复的,如果这个链条过于复杂处理就会过于慢

  • admin
    admin

    第一种解决方法:每隔几个运算我就把df的结果输出到hive,然后从hive读出来,这样读出来的df就是一个新的rdd,而不存在任何依赖,用这种方式,我暂时解决了卡住的问题,其实也就是避免了rdd依赖过长。

     

     

关注编程学问公众号