在用Spark 处理数据的时候,经常需要给dataset 或者 dataframe全量数据增加一列自增ID序号,在存入数据库的时候,自增ID也常常是一个很关键的要素,不论分区还是分页都需要使用id序号来实现。在Dataset和DataFrame的API中没有实现这一功能,所以只能通过其他方式实现,用row_number或者转成RDD再用RDD的 zipWithIndex 算子实现。下面呢就介绍两种实现方式。
0.终极方法 monotonically_increasing_id
import org.apache.spark.sql.functions._ df.withColumn("id", monotonically_increasing_id)
注:我在实际使用的时候还是遇到id重复的情况
所以:我使用resultRdd=resultRdd.withColumn("id",row_number().over(Window.orderBy(monotonically_increasing_id))) 再次排序,只是数据量过大会影响性能
源码解释:
/** * A column expression that generates monotonically increasing 64-bit integers. * (保证是单调递增并且是唯一,但不保证是连续的,这就够了) * (The generated ID is guaranteed to be monotonically increasing and unique, but not consecutive.) * The current implementation puts the partition ID in the upper 31 bits, and the record number * within each partition in the lower 33 bits. The assumption is that the data frame has * less than 1 billion partitions, and each partition has less than 8 billion records. * * As an example, consider a `DataFrame` with two partitions, each with 3 records. * This expression would return the following IDs: * * {{{ * 0, 1, 2, 8589934592 (1L << 33), 8589934593, 8589934594. * }}} * * @group normal_funcs * @since 1.6.0 */ def monotonically_increasing_id(): Column = withExpr { MonotonicallyIncreasingID() }
下面其他方法可以不看
1.使用row_number窗口函数方式
1.1实现代码
// 加载数据 val dataframe = spark.read.csv(inputFile).toDF("lon", "lat") /** * 设置窗口函数的分区以及排序,因为是全局排序而不是分组排序,所有分区依据为空 * 排序规则没有特殊要求也可以随意填写 */ val spec = Window.partitionBy().orderBy($"lon") val df1 = dataframe.withColumn("id", row_number().over(spec)) df1.show()
2.2效果
+-----------+-----------+---+ | lon| lat| id| +-----------+-----------+---+ |106.4273071|29.63554591| 1| | 106.44104|29.51372023| 2| |106.4602661|29.60211821| 3| |106.4657593|29.45394812| 4| +-----------+-----------+---+
2.3缺点
使用窗口函数那肯定是需要时间的,降低了性能
2.使用zipWithIndex,zipWithUniqueId方式添加序列号
zipWithIndex实现自增序列的代码,或者zipWithUniqueId,zipWithUniqueId能保证唯一性,zipWithIndex再多分区下不是唯一的
// 在原Schema信息的基础上添加一列 “id”信息 val schema: StructType = dataframe.schema.add(StructField("id", LongType)) // DataFrame转RDD 然后调用 zipWithIndex val dfRDD: RDD[(Row, Long)] = dataframe.rdd.zipWithIndex() val rowRDD: RDD[Row] = dfRDD.map(tp => Row.merge(tp._1, Row(tp._2))) // 将添加了索引的RDD 转化为DataFrame val df2 = spark.createDataFrame(rowRDD, schema) df2.show()
效果
+-----------+-----------+---+ | lon| lat| id| +-----------+-----------+---+ |106.4273071|29.63554591| 0| | 106.44104|29.51372023| 1| |106.4602661|29.60211821| 2| |106.4657593|29.45394812| 3| +-----------+-----------+---+
这种方式相对来说性能稍好点!
3.使用滚动统计窗口函数
select *,count(*) over(rows between unbounded preceding and current row) as ordercolumn from df
意思就是 从起始行到当前行进行计数