spark的rdd是一个二维表,二维表可以直接写入mongo,但mongo可以更灵活方便,可以存储分层结构的json类型数据。
比如rdd二维表:
[
{student:"张三",subject:"语文",score:"90"},
{student:"张三",subject:"数学",score:"90"}
]
期望得到层次结构:
[
{student:"张三",score:{语文:"90",数学:"90"}}
]
spark dataframe 二维表将列转为map类型:
def toJsonRdd(df:DataFrame,groupField:String,attributeField:Seq[String],valueField:Map[String,String]) ={
var resultDf=df
var currentGroupFields:Seq[String]=attributeField.+:(groupField)
while(currentGroupFields.length>1){
val currentAttributeName=currentGroupFields.last
currentGroupFields = currentGroupFields.dropRight(1)
var valueColumn:Seq[Column]=Seq.empty
for( (name,alias) <- valueField){
valueColumn=valueColumn:+
map_from_entries(
collect_list(struct(col(currentAttributeName),col(name)))
).as(name)
}
resultDf=resultDf.groupBy(currentGroupFields.map(col(_)):_*).agg(
valueColumn(0),
valueColumn.drop(1):_*
)
}
for( (name,alias) <- valueField){
if(StringUtils.isNotEmpty(alias)){
resultDf=resultDf.withColumnRenamed(name,alias)
}
}
resultDf
}