分片读取数据是解决性能防止数据倾斜最有效的方式。spark读取mongoDB分片的方式也有很多种,当然也要考虑mongoDB集群本身的分片方式。
1.spark 读取mongoDB分片的语法
var projectRdd=spark.read.format("com.mongodb.spark.sql")
.option("spark.mongodb.input.uri", "mongodb://192.168.1.1:23000/data.list")
.option("spark.mongodb.input.partitioner", "MongoShardedPartitioner")
.option("spark.mongodb.input.partitionerOptions.shardkey","_id")
.load()
分区的方式用spark.mongodb.input.partitioner指定
分区的属性用spark.mongodb.input.partitionerOptions.xxx来指定
2.spark 读取mongoDB分片的方式
2.1 MongoDefaultPartitioner
默认的分片方式,包装了MongoSamplePartitioner 分片并提供对老版mongoDB的支持
.option("spark.mongodb.input.partitioner", "MongoDefaultPartitioner")
2.2 MongoSamplePartitioner
支持MongoDB 3.2。抽样分区。用于所有集群分区部署的通用方式。使用集合的平均文档大小和随机抽样来确定集合的适当分区
分区属性
partitionKey 默认 _id,用于拆分集合数据的字段。字段应包含唯一索引。
partitionSizeMB 默认 64MB,每个分区的MB大小
samplesPerPartition 默认 10,每个分区抽样的文档数
2.3 MongoShardedPartitioner
用于mongo分片的集群,基于数据块来分片,需要读取数据库配置的权限
分区属性
shardkey 默认 _id,用于拆分集合数据的字段。字段应包含唯一索引。
2.4 MongoSplitVectorPartitioner
用于mongoDB独立集群或者副本集。使用操作meta-data的splitVector命令再独立集群或者主节点来决定数据的分区。
分区属性
partitionKey 默认 _id,用于拆分集合数据的字段。字段应包含唯一索引。
partitionSizeMB 默认 64MB,每个分区的MB大小
2.5 MongoPaginateByCountPartitioner
一个缓慢通用的适用所有集群的分区方式,创建特定数量的分区。需要查询每个分区
分区属性
partitionKey 默认 _id,用于拆分集合数据的字段。字段应包含唯一索引。
numberOfPartitions 默认 64,分区的数量
2.6 MongoPaginateBySizePartitioner
一个缓慢通用的适用所有集群的分区方式,基于数据大小创建分区。需要查询每个分区
分区属性
partitionKey 默认 _id,用于拆分集合数据的字段。字段应包含唯一索引。
numberOfPartitions 默认 64,分区的数量
除了提供的分区方式外,还可以自定义分区的实现。对于MongoPartitioner自定义实现,请提供完整的类名。如果没有提供包名,则默认com.mongodb.spark.rdd.partitioner