从mysql读取大表数据到spark dataset
var dataset=sparkSession.read.format("jdbc") .option("url", "jdbc:mysql:") .option("dbtable", "") .option("user", "root") .option("password", "") .option("partitionColumn", "ID") .option("lowerBound", 1) .option("upperBound", 100000000) .option("numPartitions", 1000) .load();
出现异常:
2018-12-07 16:02:20 ERROR Executor:91 - Exception in task 9999.1 in stage 0.0 (TID 10000) java.lang.OutOfMemoryError: Java heap space at com.mysql.jdbc.MysqlIO.nextRowFast(MysqlIO.java:2175) at com.mysql.jdbc.MysqlIO.nextRow(MysqlIO.java:1999) at com.mysql.jdbc.MysqlIO.readSingleRowSet(MysqlIO.java:3504) at com.mysql.jdbc.MysqlIO.getResultSet(MysqlIO.java:490) at com.mysql.jdbc.MysqlIO.readResultsForQueryOrUpdate(MysqlIO.java:3198) at com.mysql.jdbc.MysqlIO.readAllResults(MysqlIO.java:2366) at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2789) at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2815) at com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:2155) at com.mysql.jdbc.PreparedStatement.executeQuery(PreparedStatement.java:2322) at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:301) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:337) at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:335) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1$$anonfun$apply$10.apply(BlockManager.scala:1130) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1$$anonfun$apply$10.apply(BlockManager.scala:1128) at org.apache.spark.storage.DiskStore.put(DiskStore.scala:68) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1128)
问题解决:
在spark webUI 4040界面查看stage,我统计了一下成功的task,总数据量还不到数据库总数的一半,而失败的那个承载了大量的数据。
这就是数据倾斜导致的oom。
其实数据倾斜的大部分原因都是读入数据的时候导致的。
所以:调大numPartitions和upperBound(根据你分区的字段)