spark map操作实现join案例代码

spark | 2019-09-13 10:02:39

经过大量的实验,如果代码中有太多的join,会导致耗时无限长,因为会导致shuffle操作,降低性能。我就是join太多导致了卡死。

那么我们就需要使用mapPartitions和map来遍历操作来实现join,省掉shuffle。

案例代码:

//如果两个rdd遇到字段值相同,那么就连接
var subjectScoreDf:Dataset[Row]
var subjectTotalScoreDfExtend:Dataset[Row]
var subjectTotalScoreDfExtendBc=subjectTotalScoreDfExtend.sparkSession.sparkContext.broadcast(subjectTotalScoreDfExtend)
var subjectScoreFields=subjectScoreDf.columns.toSeq
var subjectScoreModelFields=subjectScoreFields++subjectTotalScoreExtendColumn.toSeq
var TMPsubjectScoreDf=subjectScoreDf.mapPartitions({ iter =>
  val subjectTotalScoreDfExtendRows = subjectTotalScoreDfExtendBc.value
  iter.map(row => {
var subjectTotalScoreRow:Row=null
for(bcRow:Row <- subjectTotalScoreDfExtendRows){
  var subjectTotalScoreStudentId=bcRow.getAs("STUDENT_ID").asInstanceOf[String]
  var subjectScoreStudentId=row.getAs("STUDENT_ID").asInstanceOf[String]
  if(subjectTotalScoreStudentId==subjectScoreStudentId){
subjectTotalScoreRow=bcRow
break
  }
}
//可以获取所有值拼接成字符串返回
//var res=row.getValuesMap(subjectScoreFields).values.toSeq
(row.getString("name"),bcRow.getString("age"))
  })
})
//转回 dataframe
TMPsubjectScoreDf.toDF(subjectScoreModelFields:_*)


注:map是由返回值组成了新的rdd,返回值是tuple元祖,tuple是不能改变长度的,也就是你新生产的rdd的列是定死的,当然你也可以返回字符串。


登录后即可回复 登录 | 注册
    
关注编程学问公众号