spark sql字段类型MapType和ArrayType

2021-03-08 16:44:31 | 编辑

1.ArrayType

之前使用spark读取mongo解决嵌套数据有用到ArrayType的案例

读取mongo嵌套

    val schema = StructType(
      Array(
        StructField("subjectiveList",
          ArrayType(StructType(Array(
            StructField("questionNo", StringType),StructField("score", DoubleType),StructField("isEffective", BooleanType)
            ,
            StructField("fastMark",
              ArrayType(StructType(Array(
                StructField("subQuestionNo", StringType),StructField("score", DoubleType)
              )))
            )
          )))
        ),
        StructField("studentId", StringType),
        StructField("classId", StringType)
      )
    )

    spark.read.format("com.mongodb.spark.sql")
      .schema(schema)
      .option("spark.mongodb.input.uri", mongoUri)
      .option("spark.mongodb.input.partitioner", "MongoSplitVectorPartitioner")
      .option("spark.mongodb.input.partitionerOptions.partitionSizeMB",32)
      .load()

提取嵌套数组到上层

Rdd.select((questionsRdd.schema.fieldNames.map(f=>{questionsRdd(f)}):+explode($"objectiveList").as("Info")):_*)
 .withColumn("no",objectiveQuestionRdd("Info")("questionNo"))

2.MapType

MapType简单例子

scala> import spark.implicits._
import spark.implicits._
scala> val ds = Seq(
     |   (1, Map("foo" -> (1, "a"), "bar" -> (2, "b"))),
     |   (2, Map("foo" -> (3, "c"))),
     |   (3, Map("bar" -> (4, "d")))
     | ).toDF("id", "alpha")
ds: org.apache.spark.sql.DataFrame = [id: int, alpha: map<string,struct<_1:int,_2:string>>]

scala> ds.printSchema
root
 |-- id: integer (nullable = false)
 |-- alpha: map (nullable = true)
 |    |-- key: string
 |    |-- value: struct (valueContainsNull = true)
 |    |    |-- _1: integer (nullable = false)
 |    |    |-- _2: string (nullable = true)


scala> ds.show()
+---+--------------------+
| id|               alpha|
+---+--------------------+
|  1|Map(foo -> [1,a],...|
|  2|   Map(foo -> [3,c])|
|  3|   Map(bar -> [4,d])|
+---+--------------------+


scala> ds.select($"alpha['bar']").show()
org.apache.spark.sql.AnalysisException: cannot resolve '`alpha['bar']`' given input columns: [id, alpha];;
'Project ['alpha['bar']]
+- Project [_1#62 AS id#65, _2#63 AS alpha#66]
   +- LocalRelation [_1#62, _2#63]

  ......

scala> ds.select($"alpha")("bar").show()
:38: error: value show is not a member of org.apache.spark.sql.Column
       ds.select($"alpha")("bar").show()
                                  ^

scala> ds.select($"alpha"("bar")).show()
+----------+
|alpha[bar]|
+----------+
|     [2,b]|
|      null|
|     [4,d]|
+----------+
scala> ds.select($"alpha"["bar"]).show()
:1: error: identifier expected but string literal found.
ds.select($"alpha"["bar"]).show()
                   ^
scala> ds.select($"alpha.bar").show()
+-----+
|  bar|
+-----+
|[2,b]|
| null|
|[4,d]|
+-----+

 

登录后即可回复 登录 | 注册
    
关注编程学问公众号