spark dataset读写 hbase 案例代码

spark | 2019-09-13 10:02:39

1.spark 读写hbase代码

spark 提供了saveAsNewAPIHadoopDataset方法来写入rdd到hbase,和 newAPIHadoopRDD方法读取hbase数据到rdd


maven依赖

<properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <scala.version>2.11.12</scala.version>
        <spark.version>2.3.0</spark.version>
        <hadoop.version>2.7.7</hadoop.version>
        <scala.main.version>2.11</scala.main.version>
        <hbase.version>1.4.9</hbase.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.main.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase</artifactId>
            <version>1.4.9</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>1.4.9</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>1.4.9</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-common</artifactId>
            <version>1.4.9</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.phoenix</groupId>
            <artifactId>phoenix-spark</artifactId>
            <version>4.14.1-HBase-1.4</version>
            <scope>provided</scope>
        </dependency>

scala代码

    // spark 需要配置 spark.hadoop.validateOutputSpecs 否则有可能验证不通过
    var spark = SparkSession.builder().appName("testSpark")
      .config("spark.some.config.option", "some-value")
      .config("spark.hadoop.validateOutputSpecs", false)
      .enableHiveSupport()
      .getOrCreate()
    val sc=spark.sparkContext
//    另外一种连接hbase的方式
//    val hbaseConf = HBaseConfiguration.create()
//    hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, "master,slave1,slave2,slave3,slave4")
//    val hbaseConn = ConnectionFactory.createConnection(hbaseConf)
//    val admin = hbaseConn.getAdmin
//    val jobConf = new JobConf(hbaseConf, this.getClass)
//    jobConf.set(TableOutputFormat.OUTPUT_TABLE, "itxwtable")
//
//    val job = Job.getInstance(jobConf)
    
    //连接hbase配置
    sc.hadoopConfiguration.set(HConstants.ZOOKEEPER_QUORUM,"master,slave1,slave2,slave3,slave4")
    sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE,"itxwtable")
    sc.hadoopConfiguration.set(TableInputFormat.INPUT_TABLE, "itxwtable")
    val hbaseConn = ConnectionFactory.createConnection(sc.hadoopConfiguration)
    val admin = hbaseConn.getAdmin
    
    //如果不存在 就创建表
    if (!admin.tableExists(TableName.valueOf("itxwtable"))) {
      val desc = new HTableDescriptor(TableName.valueOf("itxwtable"))
      val hcd = new HColumnDescriptor("cf")
      desc.addFamily(hcd)
      admin.createTable(desc)
    }
    
    //加载dataset
    var dataset=spark.sqlContext.phoenixTableAsDataFrame("datacenter.itxwtable",Seq(),Option(""),Option("master"),null);
    
    //dataset 所有列 转成hbase数据集
    val columns=dataset.columns
    val data=dataset.rdd.mapPartitions(par=>{
      par.map(row=>{
          //注意设置rowkey,不要将联系递增字段作为rowkey,会导致region热点
        val rowkey=row.getAs[Long]("ID")
        val put = new Put(Bytes.toBytes(rowkey))
        for(name<-columns){
          var value=row.get(row.fieldIndex(name))
          if(value!=null){
            put.addColumn(Bytes.toBytes("f"), Bytes.toBytes(name), Bytes.toBytes(value.toString))
          }
        }
        (new ImmutableBytesWritable,put)
      })
    })
    lazy val job = Job.getInstance(sc.hadoopConfiguration)
    job.setOutputKeyClass(classOf[ImmutableBytesWritable])
    job.setOutputValueClass(classOf[Result])
    job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
    
    //保存到hbase
    data.saveAsNewAPIHadoopDataset(job.getConfiguration)
    
    //读取hbase数据数据
    val hBaseRDD = sc.newAPIHadoopRDD(sc.hadoopConfiguration,classOf[TableInputFormat],classOf[ImmutableBytesWritable], classOf[Result])
    spark.stop


2.hbase rdd和dataset转换通用性问题

上面将dataset写入到hbase很方便,将hbase读到rdd也很方便,但我们做数据分析是基于dataset,dataframe,因此我们就需要将rdd转为dataset

只要我们有元数据字段信息,那么就可以用下面方法方便转为dataset


JavaRDD<Row> dataRDD = hbaseRDD.map(new Function<Tuple2<ImmutableBytesWritable, Result>, Row>() {
            @Override
            public Row call(Tuple2<ImmutableBytesWritable, Result> tuple) throws Exception {
                List<String> recordList = new ArrayList();
                Result result = tuple._2;
                Cell[] cells = result.rawCells();
                for (Cell cell :
                        cells) {
                    recordList.add(new String(CellUtil.cloneValue(cell)));
                }
                return (Row) RowFactory.create(recordList.toArray());
            }
        });
        //设置即将创建表的字段信息
        ArrayList<StructField> structFields = new ArrayList<>();
        List<String> fieldsList = hbaseColumnRDD.first();
        for (String field :
                fieldsList) {
            structFields.add(DataTypes.createStructField(field, DataTypes.StringType, true));
        }
        //新建列schema
        StructType schema = DataTypes.createStructType(structFields);
        Dataset hbaseDataset = sparkSession.createDataFrame(dataRDD,schema);


这我们首先得有字段信息,一般做法是mysql中保存一张空表,只要字段,然后spark读取这个空表,也就读到了字段信息。

但这挺麻烦得,其实我更提倡保存的时候以json key-value的方式将所有字段保存到一个column中,那么只有读取到第一条数据,也就有了字段名,我们也就可以来组装StructField了。这种方式更灵活!






登录后即可回复 登录 | 注册
    
关注编程学问公众号