大概步骤:
- 连接oracle,创建一个dataframe用来接收从oracle里面读取的数据。
- 将dataframe的数据写入临时表。
- 用hiveContext.sql语句将数据写入hive里面。
这个程序其实对于学了spark的人来说很简单,直接上代码吧:
package com.ctbri.cgs.oracle2Hive
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import java.util.Properties
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql.functions._
object App {
def main(args: Array[String]):Unit = {
//创建一个sparkcontext对象,用enableHiveSupport获取了对HIVE的支持
val spark = SparkSession
.builder()
.appName("Oracle2Hive")
.master("local")
.config("spark.port.maxRetries","128")
.config("spark.sql.parquet.writeLegacyFormat",true)
.enableHiveSupport()
.getOrCreate()
//连接oracle
val jdbcDF = spark.read.format("jdbc").options(
Map(
"driver" -> "oracle.jdbc.driver.OracleDriver",
"url" -> "url路径",
"user" -> "username",
"password" -> "password",
"dbtable" -> "要导出的数据表名"
)).load()
//需要转换的列名
val colName = ArrayBuffer[String]()
val schema = jdbcDF.schema.foreach(s => {
if (s.dataType.equals(DecimalType(38, 10)) || s.dataType.equals(DecimalType(4, 0))) {
colName += s.name
}
})
//字段类型转换
var df_int = jdbcDF
colName.foreach(name => {
df_int = df_int.withColumn(name, col(name).cast(IntegerType))
})
//创建临时表
jdbcDF.createOrReplaceTempView("records")
spark.sql("use 库名")
spark.sql("set hive.exec.dynamic.partition.mode=nonstrict")
jdbcDF.write.mode("overwrite").saveAsTable("表名")
}
}
其中需要注意的就是,我第一次写的时候,没有进行字段类型的转换,导致数据可以导入,在hive里面也可以查看表属性,但是无法查出具体数据,原因就是spark导入的时候,将oracle的number类型转换成了decimal类型,导致无法查看,其他诸如data,char等都是成功的,进行一下类型转换就可以了。