需要在spark中将数据保存到hbase,那就要先用spark在hbase创建表,和连接mysql数据库执行sql语句不一样,
直接上spark代码:
//spark对象 var spark = SparkSession.builder().appName("testSpark") .config("spark.some.config.option", "some-value") .config("spark.hadoop.validateOutputSpecs", false) .enableHiveSupport() .getOrCreate() val sc=spark.sparkContext /** 另外一中创建admin的方式 val hbaseConf = HBaseConfiguration.create() hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, "master,slave1,slave2,slave3,slave4") hbaseConf.set(TableOutputFormat.OUTPUT_TABLE,"hb_itxw") hbaseConf.set(TableInputFormat.INPUT_TABLE, "hb_itxw") val hbaseConn = ConnectionFactory.createConnection(hbaseConf) val admin = hbaseConn.getAdmin **/ //hbase 属性 sc.hadoopConfiguration.set(HConstants.ZOOKEEPER_QUORUM,"master,slave1,slave2,slave3,slave4") sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE,"hb_itxw") sc.hadoopConfiguration.set(TableInputFormat.INPUT_TABLE, "hb_itxw") //连接hbase val hbaseConn = ConnectionFactory.createConnection(sc.hadoopConfiguration) val admin = hbaseConn.getAdmin //如果不存在就创建表 if (!admin.tableExists(TableName.valueOf("hb_itxw"))) { val desc = new HTableDescriptor(TableName.valueOf("hb_itxw")) //指定列簇 不需要创建列,列式存储不需要创建列 val hcd = new HColumnDescriptor("cf") desc.addFamily(hcd) admin.createTable(desc) }