经过大量的实验,如果代码中有太多的join,会导致耗时无限长,因为会导致shuffle操作,降低性能。我就是join太多导致了卡死。
那么我们就需要使用mapPartitions和map来遍历操作来实现join,省掉shuffle。
案例代码:
//如果两个rdd遇到字段值相同,那么就连接 var subjectScoreDf:Dataset[Row] var subjectTotalScoreDfExtend:Dataset[Row] var subjectTotalScoreDfExtendBc=subjectTotalScoreDfExtend.sparkSession.sparkContext.broadcast(subjectTotalScoreDfExtend) var subjectScoreFields=subjectScoreDf.columns.toSeq var subjectScoreModelFields=subjectScoreFields++subjectTotalScoreExtendColumn.toSeq var TMPsubjectScoreDf=subjectScoreDf.mapPartitions({ iter => val subjectTotalScoreDfExtendRows = subjectTotalScoreDfExtendBc.value iter.map(row => { var subjectTotalScoreRow:Row=null for(bcRow:Row <- subjectTotalScoreDfExtendRows){ var subjectTotalScoreStudentId=bcRow.getAs("STUDENT_ID").asInstanceOf[String] var subjectScoreStudentId=row.getAs("STUDENT_ID").asInstanceOf[String] if(subjectTotalScoreStudentId==subjectScoreStudentId){ subjectTotalScoreRow=bcRow break } } //可以获取所有值拼接成字符串返回 //var res=row.getValuesMap(subjectScoreFields).values.toSeq (row.getString("name"),bcRow.getString("age")) }) }) //转回 dataframe TMPsubjectScoreDf.toDF(subjectScoreModelFields:_*)
注:map是由返回值组成了新的rdd,返回值是tuple元祖,tuple是不能改变长度的,也就是你新生产的rdd的列是定死的,当然你也可以返回字符串。