spark jdbc分区并发读取 mysql 大表

spark | 2019-09-13 10:02:39

spark的分区从读取数据就开始分区的,合理的分区不仅能避免错误而且能大幅度提高效率。

很多人在spark中使用默认提供的jdbc方法时,在数据库数据较大时经常发现任务 hang 住,其实是单线程任务过重导致,这时候需要提高读取的并发度。 以 mysql 3000W 数据量表为例,单分区count,僵死若干分钟报OOM。分成5-20个分区后,count 操作只需要 2s高并发度可以大幅度提高读取以及处理数据的速度,但是如果设置过高(大量的partition同时读取)也可能会将数据源数据库弄挂。


1.安装mysql-connector jar

方式一:直接将mysql-connector-java-5.1.34.jar分发到所有节点spark的jar中

方式二:在 spark-env.sh 文件中加入:export SPARK_CLASSPATH=/path/mysql-connector-java-5.1.34.jar,任务提交时加入:--jars /path/mysql-connector-java-5.1.34.jar


2.单分区无并发读取mysql数据库

val url = "jdbc:mysql://mysqlHost:3306/database"
val tableName = "table"
// 设置连接用户&密码
val prop = new java.util.Properties
prop.setProperty("user","username")
prop.setProperty("password","pwd")
// 取得该表数据
val jdbcDF = sqlContext.read.jdbc(url,tableName,prop)

查看并发度

jdbcDF.rdd.partitions.size # 结果返回 1

如果数据量大就会内存溢出OOM


3.根据id Long类型字段分区

val url = "jdbc:mysql://mysqlHost:3306/database"
val tableName = "table"
val columnName = "colName"
val lowerBound = 1,
val upperBound = 10000000,
val numPartitions = 10,
// 设置连接用户&密码
val prop = new java.util.Properties
prop.setProperty("user","username")
prop.setProperty("password","pwd")
// 取得该表数据
val jdbcDF = sqlContext.read.jdbc(url,tableName,columnName,lowerBound,upperBound,numPartitions,prop)


  url: String,
  table: String,
  columnName: String,    # 根据该字段分区,需要为整形,比如id等
  lowerBound: Long,      # 分区的下界
  upperBound: Long,      # 分区的上界
  numPartitions: Int,    # 分区的个数
  connectionProperties: Properties): DataFrame

查看并发度

jdbcDF.rdd.partitions.size # 结果返回 10

注意如果你的id字段时单调递增,单不时连续的,注意你单分区的数量,要避免数据倾斜,分区的数量要根据你数据库和spark集群来具体考虑,一般一百万是没问题的。就是几千万也能达到秒级。


4.根据任意类型字段分区

val url = "jdbc:mysql://mysqlHost:3306/database"
val tableName = "table"
// 设置连接用户&密码
val prop = new java.util.Properties
prop.setProperty("user","username")
prop.setProperty("password","pwd")
/**
* 将9月16-12月15三个月的数据取出,按时间分为6个partition
* 为了减少事例代码,这里的时间都是写死的
* modified_time 为时间字段
*/
val predicates =
    Array(
      "2015-09-16" -> "2015-09-30",
      "2015-10-01" -> "2015-10-15",
      "2015-10-16" -> "2015-10-31",
      "2015-11-01" -> "2015-11-14",
      "2015-11-15" -> "2015-11-30",
      "2015-12-01" -> "2015-12-15"
    ).map {
      case (start, end) =>
        s"cast(modified_time as date) >= date '$start' " + s"AND cast(modified_time as date) <= date '$end'"
    }
// 取得该表数据
val jdbcDF = sqlContext.read.jdbc(url,tableName,predicates,prop)

查看并发度

jdbcDF.rdd.partitions.size # 结果返回 6

无论什么数据库,根据id来分区效率还是最高的,单这种方式灵活,最能保证单分区的数据量均匀。

登录后即可回复 登录 | 注册
    
关注编程学问公众号