spark读取zip压缩文件中的json文本案例代码
private val scoreFileSchema: StructType = StructType(
Array(
StructField("objectiveList",
ArrayType(StructType(Array(
StructField("questionNo", StringType),
StructField("fastMark",
ArrayType(StructType(Array(
StructField("subNo", StringType),
StructField("score", DoubleType)
)))
)
)))
),
StructField("subjectiveList",
ArrayType(StructType(Array(
StructField("score", DoubleType),
StructField("fastMark",
ArrayType(StructType(Array(
StructField("subNo", StringType)
, StructField("score", DoubleType)
)))
)
)))
),
StructField("studentId", StringType)
)
)
def readExamRecordZipJson(spark: SparkSession, filePath: String): DataFrame = {
val dataAndPortableRDD = spark.sparkContext.binaryFiles(filePath)
val dataRDD = dataAndPortableRDD.flatMap { case (_: String, content: PortableDataStream) =>
val zis = new ZipInputStream(content.open)
Stream.continually(zis.getNextEntry)
.takeWhile(_ != null)
.flatMap { _ =>
val br = new BufferedReader(new InputStreamReader(zis))
Stream.continually(br.readLine()).takeWhile(_ != null)
}
}
spark.read.schema(scoreFileSchema).json(spark.createDataset(dataRDD)(Encoders.STRING))
}