Spark操作Hive分区表

spark | 2020-03-06 16:25:47

在之前的帖子就说过spark不支持hive的修改和删除操作,但是spark支持删除hive的分区,这也能实现一定情况下的删除需求,而且spark 2.3.0操作分区表有bug,spark2.3.3能正常操作hive分区表,也提到过如何升级spark

1、Spark创建分区表

spark创建分区表的代码

val data = Array(("001", "张三", 21, "2018"), ("002", "李四", 18, "2017"))
val df = spark.createDataFrame(data).toDF("id", "name", "age", "year")
//可以将append改为overwrite,这样如果表已存在会删掉之前的表,新建表
df.write.mode("append").partitionBy("year").saveAsTable("new_test_partition")

hive查看分区

desc new_test_partition;

show create table new_test_partition;

根据下面的结果可以看到新建的表确实有分区字段year

hive> desc new_test_partition;
OK
id                  	string              	                    
name                	string              	                    
age                 	int                 	                    
year                	string              	                    
	 	 
# Partition Information	 	 
# col_name            	data_type           	comment             
	 	 
year                	string              	                    
Time taken: 0.432 seconds, Fetched: 9 row(s)

2、向已存在的分区表插入数据

2.1 Spark创建的分区表

df.write.mode("append").partitionBy("year").saveAsTable("new_test_partition")

2.2 在Hive命令行创建分区表

create table test_partition (
id string comment 'ID', 
name string comment '名字',
age int comment '年龄'
)
comment '测试分区'
partitioned by (year int comment '年')
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' ;

2.3 spark 保存分区数据

df.write.mode("append").partitionBy("year").saveAsTable("test_partition")

出现异常

Exception in thread "main" org.apache.spark.sql.AnalysisException: The format of the existing table dkl.test_partition is `HiveFileFormat`. It doesn't match the specified format `ParquetFileFormat`.;

解决方法

用fomat指定格式

df.write.mode("append").format("Hive").partitionBy("year").saveAsTable("test_partition")

其他方法

df.createOrReplaceTempView("temp_table")
sql("insert into test_partition select * from temp_table")
df.write.insertInto("test_partition")

insertInto会根据建表分区来自动插入到指定分区

 

附:完整代码

package com.dkl.blog.spark.hive

import org.apache.spark.sql.SparkSession

/**
 * 博客:Spark操作Hive分区表
 * https://dongkelun.com/2018/12/04/sparkHivePatition/
 *
 */
object SparkHivePatition {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder()
      .appName("SparkHive")
      .master("local")
      .config("spark.sql.parquet.writeLegacyFormat", true)
      .enableHiveSupport()
      .getOrCreate()

    import spark.sql

    val data = Array(("001", "张三", 21, "2018"), ("002", "李四", 18, "2017"))

    val df = spark.createDataFrame(data).toDF("id", "name", "age", "year")
    //创建临时表
    df.createOrReplaceTempView("temp_table")

    //切换hive的数据库
    sql("use dkl")
    //    1、创建分区表,可以将append改为overwrite,这样如果表已存在会删掉之前的表,新建表
    df.write.mode("append").partitionBy("year").saveAsTable("new_test_partition")
    //2、向Spark创建的分区表写入数据
    df.write.mode("append").partitionBy("year").saveAsTable("new_test_partition")
    sql("insert into new_test_partition select * from temp_table")
    df.write.insertInto("new_test_partition")

    //开启动态分区
    sql("set hive.exec.dynamic.partition.mode=nonstrict")
    //3、向在Hive里用Sql创建的分区表写入数据,抛出异常
    //    df.write.mode("append").partitionBy("year").saveAsTable("test_partition")

    // 4、解决方法
    df.write.mode("append").format("Hive").partitionBy("year").saveAsTable("test_partition")

    sql("insert into test_partition select * from temp_table")
    df.write.insertInto("test_partition")
    //这样会抛出异常
    //    df.write.partitionBy("year").insertInto("test_partition")

    spark.stop
  }
}

 

 

登录后即可回复 登录 | 注册
    
关注编程学问公众号