1. zipWithIndex 与 zipWithUniqueId是什么
顾名思义,zipWithIndex:通过主键打包,ZipWithUniqueId:通过唯一主键打包。二者的主要作用
1.1. def zipWithIndex(): RDD[(T, Long)]
该函数将RDD中的元素和这个元素在RDD中的ID(索引号)组合成键/值对。
1.2. def zipWithUniqueId(): RDD[(T, Long)]
该函数将RDD中元素和一个唯一ID组合成键/值对,该唯一ID生成算法如下:
每个分区中第一个元素的唯一ID值为:该分区索引号,
每个分区中第N个元素的唯一ID值为:(前一个元素的唯一ID值) + (该RDD总的分区数)
该函数将RDD中的元素和这个元素在RDD中的ID(索引号)组合成键/值对。
2. 怎么用
2.1. zipWithIndex
scala> var rdd2 = sc.makeRDD(Seq("A","B","R","D","F"),2) rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[34] at makeRDD at :21 scala> rdd2.zipWithIndex().collect res27: Array[(String, Long)] = Array((A,0), (B,1), (R,2), (D,3), (F,4))
2.2. zipWithUniqueId
scala> var rdd1 = sc.makeRDD(Seq("A","B","C","D","E","F"),2)
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[44] at makeRDD at :21 //rdd1有两个分区, scala> rdd1.zipWithUniqueId().collect res32: Array[(String, Long)] = Array((A,0), (B,2), (C,4), (D,1), (E,3), (F,5)) //总分区数为2 //第一个分区第一个元素ID为0,第二个分区第一个元素ID为1 //第一个分区第二个元素ID为0+2=2,第一个分区第三个元素ID为2+2=4 //第二个分区第二个元素ID为1+2=3,第二个分区第三个元素ID为3+2=5
3. 深入源码
3.1.zipWithIndex
/** * Zips this RDD with its element indices. The ordering is first based on the partition index * and then the ordering of items within each partition. So the first item in the first * partition gets index 0, and the last item in the last partition receives the largest index. * * This is similar to Scala's zipWithIndex but it uses Long instead of Int as the index type. * This method needs to trigger a spark job when this RDD contains more than one partitions. * * @note Some RDDs, such as those returned by groupBy(), do not guarantee order of * elements in a partition. The index assigned to each element is therefore not guaranteed, * and may even change if the RDD is reevaluated. If a fixed ordering is required to guarantee * the same index assignments, you should sort the RDD with sortByKey() or save it to a file. */ def zipWithIndex(): RDD[(T, Long)] = withScope { new ZippedWithIndexRDD(this) }
正如文档所注释,The ordering is first based on the partition index,the last item in the last partition receives the largest index,ID号跟着分区走。方法new了ZippedWithIndexRDD对象,是根据分区去Index
3.2.zipWithUniqueId
/** * Zips this RDD with generated unique Long ids. Items in the kth partition will get ids k, n+k, * 2*n+k, ..., where n is the number of partitions. So there may exist gaps, but this method * won't trigger a spark job, which is different from [[org.apache.spark.rdd.RDD#zipWithIndex]]. * * @note Some RDDs, such as those returned by groupBy(), do not guarantee order of * elements in a partition. The unique ID assigned to each element is therefore not guaranteed, * and may even change if the RDD is reevaluated. If a fixed ordering is required to guarantee * the same index assignments, you should sort the RDD with sortByKey() or save it to a file. */ def zipWithUniqueId(): RDD[(T, Long)] = withScope { val n = this.partitions.length.toLong this.mapPartitionsWithIndex { case (k, iter) => Utils.getIteratorZipWithIndex(iter, 0L).map { case (item, i) => (item, i * n + k) } } }
源码doc注释中已经定义好 index的规则:will get ids k, n+k, 2*n+k, …, where n is the number of partitions.
更加值得注意一句话是:won’t trigger a spark job, which is different from [[org.apache.spark.rdd.RDD#zipWithIndex]],不会触发spark job任务。
再回去看下ZipWithIndex的源码:rev.context.runJob, 哦,原来这个方法还启动了 spark job任务,我只想拍个序给个ID,非要起个任务?
4. 总结
2个方法都有对RDD中的元素进行ID标号的功能,但是有以下区别:
前者依赖分区,可能会造成ID相同的情况。而后者根据算法“k, n+k, 2*n+k”生成Long类型的ID,所以一定不会重复,这也是他被命名为UniqueId的原因吧 后者效率更高,因为前者会启动runJob的任务
2者的共性,在Doc上也有注释:Some RDDs, such as those returned by groupBy(), do not guarantee order of elements in a partition
所以推荐使用zipWithUniqueId