spark的数据集,无论rdd,dataframe,dataset都能用自带的api函数来方便的进行数据计算,包括聚合 分组 排序 统计 新增列 连接合并表等第,而不需要自己取实现怎么计算,或者自己写sql。
记录下各种需求下的计算代码,前提条件是搭建好集群,先引入下面的代码
import org.apache.spark.sql._ import org.apache.spark.sql.functions._ import spark.sqlContext.implicits._ //代码中间引入
//根据现有列计算添加新列(这里举例两个字段相除添加得到新一列)
Df.withColumn("newColumn",$"column1"/$"column2")
//根据现有列按条件计算添加新列
Df.withColumn("is_true",when($"c1"/$"c2" >=0.8,1).otherwise(0))
//分组统计字段值总数
Df.groupBy("STUDENT_ID").agg(sum("score").as("total_score"))
//分组统计字段值出现的次数
Df.groupBy("STUDENT_ID").agg(count(when($"grade_level" ==="A",1).otherwise(0)).as("countA"))
//按字段合并表 左连接
Df=Df.join( Df1, "STUDENT_ID" )
//按多个字段合并表 左连接
Df=Df.join( Df1, Seq("SCHOOL_ID","SUBJECT_ID") )
//分组求平均值
Df.groupBy("SCHOOL_ID").agg(avg("SCORE").as("SCORE_AVG_RANK"))
//分组求平均值 四舍五入保留2位小数
Df.groupBy("SCHOOL_ID").agg(round(avg("SCORE"),2).as("SCORE_AVG_RANK"))
//正序排序
Df.orderBy("SCORE_AVG_RANK")
//倒序排序
Df.orderBy(desc("SCORE_AVG_RANK"))
//计算排名 详见:
df.withColumn("SCORE_AVG_RANK", rank().over(Window.partitionBy().orderBy($"SCORE_AVG".desc)))
//格式化字符串,把值拼接到字符中
df.withColumn("value",format_string("第%d选择",$"value"))