spark dataset 实现自增id序列号的方法

spark | 2019-09-13 10:02:39

在用Spark 处理数据的时候,经常需要给dataset 或者 dataframe全量数据增加一列自增ID序号,在存入数据库的时候,自增ID也常常是一个很关键的要素,不论分区还是分页都需要使用id序号来实现。在Dataset和DataFrame的API中没有实现这一功能,所以只能通过其他方式实现,用row_number或者转成RDD再用RDD的 zipWithIndex 算子实现。下面呢就介绍两种实现方式。


0.终极方法 monotonically_increasing_id

import org.apache.spark.sql.functions._
df.withColumn("id", monotonically_increasing_id)

注:我在实际使用的时候还是遇到id重复的情况

所以:我使用resultRdd=resultRdd.withColumn("id",row_number().over(Window.orderBy(monotonically_increasing_id))) 再次排序,只是数据量过大会影响性能


源码解释:

/**
   * A column expression that generates monotonically increasing 64-bit integers.
   *  (保证是单调递增并且是唯一,但不保证是连续的,这就够了)
   * (The generated ID is guaranteed to be monotonically increasing and unique, but not consecutive.)
   * The current implementation puts the partition ID in the upper 31 bits, and the record number
   * within each partition in the lower 33 bits. The assumption is that the data frame has
   * less than 1 billion partitions, and each partition has less than 8 billion records.
   *
   * As an example, consider a `DataFrame` with two partitions, each with 3 records.
   * This expression would return the following IDs:
   *
   * {{{
   * 0, 1, 2, 8589934592 (1L << 33), 8589934593, 8589934594.
   * }}}
   *
   * @group normal_funcs
   * @since 1.6.0
   */
  def monotonically_increasing_id(): Column = withExpr { MonotonicallyIncreasingID() }



下面其他方法可以不看

1.使用row_number窗口函数方式

1.1实现代码

// 加载数据
val dataframe = spark.read.csv(inputFile).toDF("lon", "lat")
/**
      * 设置窗口函数的分区以及排序,因为是全局排序而不是分组排序,所有分区依据为空
      * 排序规则没有特殊要求也可以随意填写
      */
    val spec = Window.partitionBy().orderBy($"lon")
    val df1 = dataframe.withColumn("id", row_number().over(spec))
    df1.show()

2.2效果

+-----------+-----------+---+
|        lon|        lat| id|
+-----------+-----------+---+
|106.4273071|29.63554591|  1|
|  106.44104|29.51372023|  2|
|106.4602661|29.60211821|  3|
|106.4657593|29.45394812|  4|
+-----------+-----------+---+


2.3缺点

使用窗口函数那肯定是需要时间的,降低了性能


2.使用zipWithIndex,zipWithUniqueId方式添加序列号

zipWithIndex实现自增序列的代码,或者zipWithUniqueId,zipWithUniqueId能保证唯一性,zipWithIndex再多分区下不是唯一的

// 在原Schema信息的基础上添加一列 “id”信息
    val schema: StructType = dataframe.schema.add(StructField("id", LongType))
    // DataFrame转RDD 然后调用 zipWithIndex
    val dfRDD: RDD[(Row, Long)] = dataframe.rdd.zipWithIndex()
    val rowRDD: RDD[Row] = dfRDD.map(tp => Row.merge(tp._1, Row(tp._2)))
    // 将添加了索引的RDD 转化为DataFrame
    val df2 = spark.createDataFrame(rowRDD, schema)
    df2.show()


效果

+-----------+-----------+---+
|        lon|        lat| id|
+-----------+-----------+---+
|106.4273071|29.63554591|  0|
|  106.44104|29.51372023|  1|
|106.4602661|29.60211821|  2|
|106.4657593|29.45394812|  3|
+-----------+-----------+---+

这种方式相对来说性能稍好点!


3.使用滚动统计窗口函数

select *,count(*) over(rows between unbounded preceding and current row) as ordercolumn from df

意思就是 从起始行到当前行进行计数







登录后即可回复 登录 | 注册
    
关注编程学问公众号