1.spark 读写hbase代码
spark 提供了saveAsNewAPIHadoopDataset方法来写入rdd到hbase,和 newAPIHadoopRDD方法读取hbase数据到rdd
maven依赖
<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <scala.version>2.11.12</scala.version> <spark.version>2.3.0</spark.version> <hadoop.version>2.7.7</hadoop.version> <scala.main.version>2.11</scala.main.version> <hbase.version>1.4.9</hbase.version> </properties> <dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_${scala.main.version}</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase</artifactId> <version>1.4.9</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>1.4.9</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>1.4.9</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-common</artifactId> <version>1.4.9</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.phoenix</groupId> <artifactId>phoenix-spark</artifactId> <version>4.14.1-HBase-1.4</version> <scope>provided</scope> </dependency>
scala代码
// spark 需要配置 spark.hadoop.validateOutputSpecs 否则有可能验证不通过 var spark = SparkSession.builder().appName("testSpark") .config("spark.some.config.option", "some-value") .config("spark.hadoop.validateOutputSpecs", false) .enableHiveSupport() .getOrCreate() val sc=spark.sparkContext // 另外一种连接hbase的方式 // val hbaseConf = HBaseConfiguration.create() // hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, "master,slave1,slave2,slave3,slave4") // val hbaseConn = ConnectionFactory.createConnection(hbaseConf) // val admin = hbaseConn.getAdmin // val jobConf = new JobConf(hbaseConf, this.getClass) // jobConf.set(TableOutputFormat.OUTPUT_TABLE, "itxwtable") // // val job = Job.getInstance(jobConf) //连接hbase配置 sc.hadoopConfiguration.set(HConstants.ZOOKEEPER_QUORUM,"master,slave1,slave2,slave3,slave4") sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE,"itxwtable") sc.hadoopConfiguration.set(TableInputFormat.INPUT_TABLE, "itxwtable") val hbaseConn = ConnectionFactory.createConnection(sc.hadoopConfiguration) val admin = hbaseConn.getAdmin //如果不存在 就创建表 if (!admin.tableExists(TableName.valueOf("itxwtable"))) { val desc = new HTableDescriptor(TableName.valueOf("itxwtable")) val hcd = new HColumnDescriptor("cf") desc.addFamily(hcd) admin.createTable(desc) } //加载dataset var dataset=spark.sqlContext.phoenixTableAsDataFrame("datacenter.itxwtable",Seq(),Option(""),Option("master"),null); //dataset 所有列 转成hbase数据集 val columns=dataset.columns val data=dataset.rdd.mapPartitions(par=>{ par.map(row=>{ //注意设置rowkey,不要将联系递增字段作为rowkey,会导致region热点 val rowkey=row.getAs[Long]("ID") val put = new Put(Bytes.toBytes(rowkey)) for(name<-columns){ var value=row.get(row.fieldIndex(name)) if(value!=null){ put.addColumn(Bytes.toBytes("f"), Bytes.toBytes(name), Bytes.toBytes(value.toString)) } } (new ImmutableBytesWritable,put) }) }) lazy val job = Job.getInstance(sc.hadoopConfiguration) job.setOutputKeyClass(classOf[ImmutableBytesWritable]) job.setOutputValueClass(classOf[Result]) job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]]) //保存到hbase data.saveAsNewAPIHadoopDataset(job.getConfiguration) //读取hbase数据数据 val hBaseRDD = sc.newAPIHadoopRDD(sc.hadoopConfiguration,classOf[TableInputFormat],classOf[ImmutableBytesWritable], classOf[Result]) spark.stop
2.hbase rdd和dataset转换通用性问题
上面将dataset写入到hbase很方便,将hbase读到rdd也很方便,但我们做数据分析是基于dataset,dataframe,因此我们就需要将rdd转为dataset
只要我们有元数据字段信息,那么就可以用下面方法方便转为dataset
JavaRDD<Row> dataRDD = hbaseRDD.map(new Function<Tuple2<ImmutableBytesWritable, Result>, Row>() { @Override public Row call(Tuple2<ImmutableBytesWritable, Result> tuple) throws Exception { List<String> recordList = new ArrayList(); Result result = tuple._2; Cell[] cells = result.rawCells(); for (Cell cell : cells) { recordList.add(new String(CellUtil.cloneValue(cell))); } return (Row) RowFactory.create(recordList.toArray()); } }); //设置即将创建表的字段信息 ArrayList<StructField> structFields = new ArrayList<>(); List<String> fieldsList = hbaseColumnRDD.first(); for (String field : fieldsList) { structFields.add(DataTypes.createStructField(field, DataTypes.StringType, true)); } //新建列schema StructType schema = DataTypes.createStructType(structFields); Dataset hbaseDataset = sparkSession.createDataFrame(dataRDD,schema);
这我们首先得有字段信息,一般做法是mysql中保存一张空表,只要字段,然后spark读取这个空表,也就读到了字段信息。
但这挺麻烦得,其实我更提倡保存的时候以json key-value的方式将所有字段保存到一个column中,那么只有读取到第一条数据,也就有了字段名,我们也就可以来组装StructField了。这种方式更灵活!