spark rdd转DataFrame构造字段信息元数据

scala | 2019-09-13 10:02:39

很多时候我们的计算中map操作会生成新的Rdd,那么rdd需要转dataset或者dataframe,那么久需要手动构造字段元数据,然后才能转成dataset或者dataframe。

SparkConf conf = new SparkConf().setMaster("local").setAppName("RDD2DataFrameByReflection");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlcontext = new SQLContext(sc);
/**
* 在RDD的基础上创建类型为Row的RDD
*/
JavaRDD<String> lines = sc.textFile("Peoples.txt");
JavaRDD<Row> rowRDD = lines.map(new Function<String, Row>() {
/**
*
*/
private static final long serialVersionUID = 1L;
 
@Override
public Row call(String line) throws Exception {
String[] split = line.split(",");
return RowFactory.create(Integer.valueOf(split[0]),split[1],Integer.valueOf(split[2]));
}
});
/**
* 动态构造DataFrame的元数据,一般而言,有多少列以及每列的具体类型可能来自于Json,也可能来自于DB
*/
ArrayList<StructField> structFields = new ArrayList<StructField>();
structFields.add(DataTypes.createStructField("id", DataTypes.IntegerType, true));
structFields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
structFields.add(DataTypes.createStructField("age", DataTypes.IntegerType, true));
//构建StructType,用于最后DataFrame元数据的描述
StructType schema = DataTypes.createStructType(structFields);
/**
* 基于已有的MetaData以及RDD<Row> 来构造DataFrame
*/
DataFrame df = sqlcontext.createDataFrame(rowRDD, schema);
/**
*注册成为临时表以供后续的SQL操作查询
*/
df.registerTempTable("persons");
/**
* 进行数据的多维度分析
*/
DataFrame result = sqlcontext.sql("select * from persons where age > 7");
result.show();
 
/**
* 对结果进行处理,包括由DataFrame转换成为RDD<Row>
*/
List<Row> listRow = result.javaRDD().collect();
for (Row row : listRow) {
System.out.println(row);
}

登录后即可回复 登录 | 注册
    
关注编程学问公众号