spark从oracle导入数据到hive

spark | 2019-09-13 10:12:05

大概步骤:

  1. 连接oracle,创建一个dataframe用来接收从oracle里面读取的数据。
  2. 将dataframe的数据写入临时表。
  3. 用hiveContext.sql语句将数据写入hive里面。

这个程序其实对于学了spark的人来说很简单,直接上代码吧:

package com.ctbri.cgs.oracle2Hive
    
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.types._
    import java.util.Properties
    import scala.collection.mutable.ArrayBuffer
    import org.apache.spark.sql.functions._
    
    object App {
      
      def main(args: Array[String]):Unit = {
      		//创建一个sparkcontext对象,用enableHiveSupport获取了对HIVE的支持
            val spark = SparkSession
            .builder()
            .appName("Oracle2Hive")
            .master("local")
            .config("spark.port.maxRetries","128")
            .config("spark.sql.parquet.writeLegacyFormat",true)
            .enableHiveSupport()
            .getOrCreate()
    
            //连接oracle
            val jdbcDF = spark.read.format("jdbc").options(
            Map(
            "driver" -> "oracle.jdbc.driver.OracleDriver",
            "url" -> "url路径",
            "user" -> "username",
            "password" -> "password",
            "dbtable" -> "要导出的数据表名"
            )).load()
       
            //需要转换的列名
            val colName = ArrayBuffer[String]()
            val schema = jdbcDF.schema.foreach(s => {
              if (s.dataType.equals(DecimalType(38, 10)) || s.dataType.equals(DecimalType(4, 0))) {
                colName += s.name
              }
            })
            
            //字段类型转换
            var df_int = jdbcDF
            colName.foreach(name => {
              df_int = df_int.withColumn(name, col(name).cast(IntegerType))
            })
           
            //创建临时表 
            jdbcDF.createOrReplaceTempView("records")
           
            spark.sql("use 库名")
            spark.sql("set hive.exec.dynamic.partition.mode=nonstrict")
            jdbcDF.write.mode("overwrite").saveAsTable("表名")
            }
    }

其中需要注意的就是,我第一次写的时候,没有进行字段类型的转换,导致数据可以导入,在hive里面也可以查看表属性,但是无法查出具体数据,原因就是spark导入的时候,将oracle的number类型转换成了decimal类型,导致无法查看,其他诸如data,char等都是成功的,进行一下类型转换就可以了。

登录后即可回复 登录 | 注册
    
  • admin
    admin

    和spark读取mysql一样的

关注编程学问公众号