spark操作hive分区表 源码bug排查

spark | 2020-03-06 16:25:47

1.异常场景

hive实现创建好分区表,然后使用spark插入数据到分区表

val  result = sparkSession.createDataFrame(rdd, schema)

result.write.mode("append").format("hive").partitionBy("dt").saveAsTable("test_table_name")

2.异常信息

org.apache.spark.SparkException: Requested partitioning does not match the test_table_name table:
Requested partitions: 
Table partitions: dt
	at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.processInsert(InsertIntoHiveTable.scala:141)
	at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.run(InsertIntoHiveTable.scala:99)
	at org.apache.spark.sql.hive.execution.CreateHiveTableAsSelectCommand.run(CreateHiveTableAsSelectCommand.scala:66)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:654)
	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:654)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:654)
	at org.apache.spark.sql.DataFrameWriter.createTable(DataFrameWriter.scala:458)
	at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:437)
	at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:393)
	//用户类UserMainClass
	at com.jd.union.bonus.spark.streaming.UserMainClass$$anonfun$createContext$2.apply(UserMainClass.scala:202)
	at com.jd.union.bonus.spark.streaming.UserMainClass$$anonfun$createContext$2.apply(UserMainClass.scala:167)
	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
	at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
	at scala.util.Try$.apply(Try.scala:192)
	at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

3.异常排查

从第一行抛出的异常代码中我们可以发现

org.apache.spark.SparkException: Requested partitioning does not match the test_table_name table:
Requested partitions: 
Table partitions: dt

在写入数据时候,发现找不到这个数据按照哪个分区写入,我们可以看到Requested partitions后面的值为空,但是去hive元数据里面查发现要插入的表是按照dt字段分区的,Table partitions: dt。这样造成了不一致匹配问题,从而抛出异常。

4.解决方法

spark 2.3.0在操作hive分区表时有bug,需要使用spark 2.3.3 版本,请参考 如何升级spark

官方bug报告地址:https://issues.apache.org/jira/browse/SPARK-26307

 

登录后即可回复 登录 | 注册
    
关注编程学问公众号