spark操作hive orc transactional事务表异常

hive | 2020-03-06 16:25:47

1.spark不支持hive表delete update

我需要往hive表中insert数据,在插入数据之前要先删除。但是使用spark操作删除hive表行,会出异常

0: jdbc:hive2://master:10000> delete from d8 where id=1;
Error: org.apache.spark.sql.catalyst.parser.ParseException: 
Operation not allowed: delete from(line 1, pos 0)
== SQL ==
delete from d8 where id=1
^^^ (state=,code=0)


2.配置hive表acid 支持 delete update

既然spark不能delete,我只能配置,通过hive操作delete数据

hive配置表transactional支持删除delete和update修改

3.spark还是不能读取hive transactional表

上面配置好可以在hive中delete了,但数据分析我还是要用spark读取hive表,但是spark是不支持读取hive transactional表的。查询hive transactional表出现异常。

java.lang.RuntimeException: serious problem
  at org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.generateSplitsInfo(OrcInputFormat.java:1021)
  at org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.getSplits(OrcInputFormat.java:1048)
  at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:200)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
  at scala.Option.getOrElse(Option.scala:121)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
  at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
  at scala.Option.getOrElse(Option.scala:121)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
  at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
  at scala.Option.getOrElse(Option.scala:121)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
  at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:84)
  at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:84)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.immutable.List.foreach(List.scala:381)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
  at scala.collection.immutable.List.map(List.scala:285)
  at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:84)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
  at scala.Option.getOrElse(Option.scala:121)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
  at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
  at scala.Option.getOrElse(Option.scala:121)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
  at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
  at scala.Option.getOrElse(Option.scala:121)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
  at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
  at scala.Option.getOrElse(Option.scala:121)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
  at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
  at scala.Option.getOrElse(Option.scala:121)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
  at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:340)
  at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
  at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3272)
  at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2484)
  at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2484)
  at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3253)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3252)
  at org.apache.spark.sql.Dataset.head(Dataset.scala:2484)
  at org.apache.spark.sql.Dataset.take(Dataset.scala:2698)
  at org.apache.spark.sql.Dataset.showString(Dataset.scala:254)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:723)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:682)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:691)
  ... 49 elided
Caused by: java.util.concurrent.ExecutionException: java.lang.NumberFormatException: For input string: "0000001_0000"
  at java.util.concurrent.FutureTask.report(FutureTask.java:122)
  at java.util.concurrent.FutureTask.get(FutureTask.java:192)
  at org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.generateSplitsInfo(OrcInputFormat.java:998)
  ... 111 more
Caused by: java.lang.NumberFormatException: For input string: "0000001_0000"
  at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
  at java.lang.Long.parseLong(Long.java:589)
  at java.lang.Long.parseLong(Long.java:631)
  at org.apache.hadoop.hive.ql.io.AcidUtils.parseDelta(AcidUtils.java:310)
  at org.apache.hadoop.hive.ql.io.AcidUtils.getAcidState(AcidUtils.java:379)
  at org.apache.hadoop.hive.ql.io.orc.OrcInputFormat$FileGenerator.call(OrcInputFormat.java:634)
  at org.apache.hadoop.hive.ql.io.orc.OrcInputFormat$FileGenerator.call(OrcInputFormat.java:620)
  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:745)


google了一把https://issues.apache.org/jira/browse/SPARK-15348

Spark does not support any feature of hive's transnational tables,
you cannot use spark to delete/update a table and it also has problems reading the aggregated data when no compaction was done.
Also it seems that compaction is not supported - alter table ... partition .... COMPACT 'major'


spark 不支持hive 表 delete update,也不支持读取 能修改删除的hive transactional表。

ok,一切都瞎折腾了。


解决方案:

给数据加版本号来实现数据删除的效果,每条数据加入插入日期,和isDelete两个字段,具体业务具体设计吧。

这样就只需要避spark 不支持读取 acid的表,这个不难避。姿势对本来就支持。





登录后即可回复 登录 | 注册
    
  • admin
    admin

    最后使用hive的分区表实现了删除,解决了我的需求,但spark2.3.0不支持,我把spark jar升级到了2.3.3

关注编程学问公众号