1.spark不支持hive表delete update
我需要往hive表中insert数据,在插入数据之前要先删除。但是使用spark操作删除hive表行,会出异常
0: jdbc:hive2://master:10000> delete from d8 where id=1; Error: org.apache.spark.sql.catalyst.parser.ParseException: Operation not allowed: delete from(line 1, pos 0) == SQL == delete from d8 where id=1 ^^^ (state=,code=0)
2.配置hive表acid 支持 delete update
既然spark不能delete,我只能配置,通过hive操作delete数据
3.spark还是不能读取hive transactional表
上面配置好可以在hive中delete了,但数据分析我还是要用spark读取hive表,但是spark是不支持读取hive transactional表的。查询hive transactional表出现异常。
java.lang.RuntimeException: serious problem at org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.generateSplitsInfo(OrcInputFormat.java:1021) at org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.getSplits(OrcInputFormat.java:1048) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:200) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:251) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:251) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:251) at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:84) at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:84) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.immutable.List.foreach(List.scala:381) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.immutable.List.map(List.scala:285) at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:84) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:251) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:251) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:251) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:251) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:251) at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:340) at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38) at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3272) at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2484) at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2484) at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3253) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3252) at org.apache.spark.sql.Dataset.head(Dataset.scala:2484) at org.apache.spark.sql.Dataset.take(Dataset.scala:2698) at org.apache.spark.sql.Dataset.showString(Dataset.scala:254) at org.apache.spark.sql.Dataset.show(Dataset.scala:723) at org.apache.spark.sql.Dataset.show(Dataset.scala:682) at org.apache.spark.sql.Dataset.show(Dataset.scala:691) ... 49 elided Caused by: java.util.concurrent.ExecutionException: java.lang.NumberFormatException: For input string: "0000001_0000" at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.generateSplitsInfo(OrcInputFormat.java:998) ... 111 more Caused by: java.lang.NumberFormatException: For input string: "0000001_0000" at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) at java.lang.Long.parseLong(Long.java:589) at java.lang.Long.parseLong(Long.java:631) at org.apache.hadoop.hive.ql.io.AcidUtils.parseDelta(AcidUtils.java:310) at org.apache.hadoop.hive.ql.io.AcidUtils.getAcidState(AcidUtils.java:379) at org.apache.hadoop.hive.ql.io.orc.OrcInputFormat$FileGenerator.call(OrcInputFormat.java:634) at org.apache.hadoop.hive.ql.io.orc.OrcInputFormat$FileGenerator.call(OrcInputFormat.java:620) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
google了一把https://issues.apache.org/jira/browse/SPARK-15348
Spark does not support any feature of hive's transnational tables, you cannot use spark to delete/update a table and it also has problems reading the aggregated data when no compaction was done. Also it seems that compaction is not supported - alter table ... partition .... COMPACT 'major'
spark 不支持hive 表 delete update,也不支持读取 能修改删除的hive transactional表。
ok,一切都瞎折腾了。
解决方案:
给数据加版本号来实现数据删除的效果,每条数据加入插入日期,和isDelete两个字段,具体业务具体设计吧。
这样就只需要避spark 不支持读取 acid的表,这个不难避。姿势对本来就支持。