spark读取hbase到rdd并转换为dataset 案例

spark | 2019-09-13 10:02:39

1.spark读取hbase到rdd

        System.setProperty("hadoop.home.dir", "E:\\02-hadoop\\hadoop-2.7.3\\");
        System.setProperty("HADOOP_USER_NAME", "root"); 
        System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
        SparkSession spark=SparkSession.builder()  
                .appName("lcc_java_read_hbase_register_to_table")  
                .master("local[*]")  
                .getOrCreate();  
        JavaSparkContext context = new JavaSparkContext(spark.sparkContext());
        Configuration configuration = HBaseConfiguration.create();  
        configuration.set("hbase.zookeeper.property.clientPort", "2181");  
        configuration.set("hbase.zookeeper.quorum", "192.168.10.82");  
        //configuration.set("hbase.master", "192.168.10.82:60000");  
        Scan scan = new Scan();
        String tableName = "test_lcc_person";
        configuration.set(TableInputFormat.INPUT_TABLE, tableName);
        ClientProtos.Scan proto = ProtobufUtil.toScan(scan);
        String ScanToString = Base64.encodeBytes(proto.toByteArray());
        configuration.set(TableInputFormat.SCAN, ScanToString);
        JavaPairRDD<ImmutableBytesWritable, Result> myRDD = context.newAPIHadoopRDD(configuration,TableInputFormat.class, ImmutableBytesWritable.class, Result.class);


2.rdd转为dataset 并执行spark sql

JavaRDD<Row> personsRDD = myRDD.map(new Function<Tuple2<ImmutableBytesWritable,Result>,Row>() {
            @Override
            public Row call(Tuple2<ImmutableBytesWritable, Result> tuple) throws Exception {
                // TODO Auto-generated method stub
                System.out.println("====tuple=========="+tuple);
                Result result = tuple._2();
                String rowkey = Bytes.toString(result.getRow());
                String name = Bytes.toString(result.getValue(Bytes.toBytes("lcc_liezu"), Bytes.toBytes("name")));
                String sex = Bytes.toString(result.getValue(Bytes.toBytes("lcc_liezu"), Bytes.toBytes("sex")));
                String age = Bytes.toString(result.getValue(Bytes.toBytes("lcc_liezu"), Bytes.toBytes("age")));
                //这一点可以直接转化为row类型
                return (Row) RowFactory.create(rowkey,name,sex,age);
            }
        });
        List<StructField> structFields=new ArrayList<StructField>();  
        structFields.add(DataTypes.createStructField("id", DataTypes.StringType, true));
        structFields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
        structFields.add(DataTypes.createStructField("sex", DataTypes.StringType, true));
        structFields.add(DataTypes.createStructField("age", DataTypes.StringType, true));
        StructType schema=DataTypes.createStructType(structFields);  
        Dataset stuDf=spark.createDataFrame(personsRDD, schema);
       //stuDf.select("id","name","age").write().mode(SaveMode.Append).parquet("par");  
        stuDf.printSchema();  
        stuDf.createOrReplaceTempView("Person");  
        Dataset<Row> nameDf=spark.sql("select * from Person ");  
        nameDf.show();


3.输出结果

 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- age: string (nullable = true)
====tuple==========(31,keyvalues={1/lcc_liezu:age/1502162685488/Put/vlen=2/seqid=0, 1/lcc_liezu:id/1502763623953/Put/vlen=1/seqid=0, 1/lcc_liezu:name/1502162660029/Put/vlen=10/seqid=0, 1/lcc_liezu:sex/1502162685434/Put/vlen=3/seqid=0})
====tuple==========(32,keyvalues={2/lcc_liezu:age/1502162685588/Put/vlen=2/seqid=0, 2/lcc_liezu:id/1502763623989/Put/vlen=1/seqid=0, 2/lcc_liezu:name/1502162685520/Put/vlen=10/seqid=0, 2/lcc_liezu:sex/1502162685556/Put/vlen=3/seqid=0})
====tuple==========(33,keyvalues={3/lcc_liezu:age/1502162685710/Put/vlen=2/seqid=0, 3/lcc_liezu:id/1502763624011/Put/vlen=1/seqid=0, 3/lcc_liezu:name/1502162685628/Put/vlen=10/seqid=0, 3/lcc_liezu:sex/1502162685659/Put/vlen=3/seqid=0})
====tuple==========(34,keyvalues={4/lcc_liezu:age/1502162685803/Put/vlen=2/seqid=0, 4/lcc_liezu:id/1502763624039/Put/vlen=1/seqid=0, 4/lcc_liezu:name/1502162685732/Put/vlen=10/seqid=0, 4/lcc_liezu:sex/1502162685762/Put/vlen=3/seqid=0})
====tuple==========(35,keyvalues={5/lcc_liezu:age/1502162685904/Put/vlen=2/seqid=0, 5/lcc_liezu:id/1502763624068/Put/vlen=1/seqid=0, 5/lcc_liezu:name/1502162685825/Put/vlen=10/seqid=0, 5/lcc_liezu:sex/1502162685861/Put/vlen=3/seqid=0})
====tuple==========(36,keyvalues={6/lcc_liezu:age/1502162687751/Put/vlen=2/seqid=0, 6/lcc_liezu:id/1502763624087/Put/vlen=1/seqid=0, 6/lcc_liezu:name/1502162685940/Put/vlen=10/seqid=0, 6/lcc_liezu:sex/1502162685985/Put/vlen=3/seqid=0})
====tuple==========(37,keyvalues={7/lcc_liezu:age/1502437506555/Put/vlen=2/seqid=0, 7/lcc_liezu:id/1502763627974/Put/vlen=1/seqid=0, 7/lcc_liezu:name/1502437505073/Put/vlen=10/seqid=0, 7/lcc_liezu:sex/1502437505105/Put/vlen=3/seqid=0})
+---+----+---+---+
| id|name|sex|age|
+---+----+---+---+
|  1|梁川川1|  男| 12|
|  2|梁川川2|  男| 12|
|  3|梁川川3|  男| 12|
|  4|梁川川4|  男| 12|
|  5|梁川川5|  男| 12|
|  6|梁川川6|  男| 12|
|  7|梁川川7|  男| 17|
+---+----+---+---+


登录后即可回复 登录 | 注册
    
关注编程学问公众号