spark sql collect_set、collect_list 、concat_ws都用来做字段拼接,其实和mysql中的字段拼接函数差不多,请参考
1.concat_ws()函数
concat_ws()-CONCAT With Separator ,用分隔符来拼接字段
spark中concat_ws()和Mysql中一样,所以不多解释,
看源码
/** * Concatenates multiple input string columns together into a single string column, * using the given separator. *使用指定的分隔符将多个列拼接到单个字符串列中 * * @group string_funcs * @since 1.5.0 */ @scala.annotation.varargs def concat_ws(sep: String, exprs: Column*): Column = withExpr { ConcatWs(Literal.create(sep, StringType) +: exprs.map(_.expr)) }
2.collect_set和collect_list
collect_set和collect_list,都是将分组后的单个列放到一个集合中
不同的是和java集合一样,Set不能存重复的值,是去重的,而list是不去重的
/** * Aggregate function: returns a list of objects with duplicates. *聚合函数返回可重复的list对象 * @group agg_funcs * @since 1.6.0 */ def collect_list(e: Column): Column = withAggregateFunction { CollectList(e.expr) } /** * Aggregate function: returns a set of objects with duplicate elements eliminated. *聚合函数返回去重后的set对象 * @group agg_funcs * @since 1.6.0 */ def collect_set(e: Column): Column = withAggregateFunction { CollectSet(e.expr) }
案例:
df.groupBy("age").agg(collect_set("name")).show()
返回结果
+---+-----------------+ |age|collect_set(name)| +---+-----------------+ | 20| [LI, Justin]| | 19| [Justin]| | 29| [Michael]| | 30| [Andy]| +---+-----------------+
3.联合使用实现分组拼接字段
记得mysql 中的group_concat嘛,collect_list 、concat_ws联合使用就能实现分组拼接的效果
sqlContext.sql("select concat_ws(',',collect_set(name)) as names from people group by age").show()
返回结果
+---------+---+ | names|age| +---------+---+ |LI,Justin| 20| | Justin| 19| | Michael| 29| Andy| 30| +---------+---+