很多时候我们的计算中map操作会生成新的Rdd,那么rdd需要转dataset或者dataframe,那么久需要手动构造字段元数据,然后才能转成dataset或者dataframe。
SparkConf conf = new SparkConf().setMaster("local").setAppName("RDD2DataFrameByReflection"); JavaSparkContext sc = new JavaSparkContext(conf); SQLContext sqlcontext = new SQLContext(sc); /** * 在RDD的基础上创建类型为Row的RDD */ JavaRDD<String> lines = sc.textFile("Peoples.txt"); JavaRDD<Row> rowRDD = lines.map(new Function<String, Row>() { /** * */ private static final long serialVersionUID = 1L; @Override public Row call(String line) throws Exception { String[] split = line.split(","); return RowFactory.create(Integer.valueOf(split[0]),split[1],Integer.valueOf(split[2])); } }); /** * 动态构造DataFrame的元数据,一般而言,有多少列以及每列的具体类型可能来自于Json,也可能来自于DB */ ArrayList<StructField> structFields = new ArrayList<StructField>(); structFields.add(DataTypes.createStructField("id", DataTypes.IntegerType, true)); structFields.add(DataTypes.createStructField("name", DataTypes.StringType, true)); structFields.add(DataTypes.createStructField("age", DataTypes.IntegerType, true)); //构建StructType,用于最后DataFrame元数据的描述 StructType schema = DataTypes.createStructType(structFields); /** * 基于已有的MetaData以及RDD<Row> 来构造DataFrame */ DataFrame df = sqlcontext.createDataFrame(rowRDD, schema); /** *注册成为临时表以供后续的SQL操作查询 */ df.registerTempTable("persons"); /** * 进行数据的多维度分析 */ DataFrame result = sqlcontext.sql("select * from persons where age > 7"); result.show(); /** * 对结果进行处理,包括由DataFrame转换成为RDD<Row> */ List<Row> listRow = result.javaRDD().collect(); for (Row row : listRow) { System.out.println(row); }