spark读取zip压缩文件中的json文本

spark | 2021-04-22 09:46:55

spark读取zip压缩文件中的json文本案例代码

private val scoreFileSchema: StructType = StructType(
    Array(
      StructField("objectiveList",
        ArrayType(StructType(Array(
          StructField("questionNo", StringType),
          StructField("fastMark",
            ArrayType(StructType(Array(
              StructField("subNo", StringType),
              StructField("score", DoubleType)
            )))
          )
        )))
      ),
      StructField("subjectiveList",
        ArrayType(StructType(Array(
          StructField("score", DoubleType),
          StructField("fastMark",
            ArrayType(StructType(Array(
              StructField("subNo", StringType)
              , StructField("score", DoubleType)
            )))
          )
        )))
      ),
      StructField("studentId", StringType)
    )
  )


  def readExamRecordZipJson(spark: SparkSession, filePath: String): DataFrame = {
    val dataAndPortableRDD = spark.sparkContext.binaryFiles(filePath)
    val dataRDD = dataAndPortableRDD.flatMap { case (_: String, content: PortableDataStream) =>
      val zis = new ZipInputStream(content.open)
      Stream.continually(zis.getNextEntry)
        .takeWhile(_ != null)
        .flatMap { _ =>
          val br = new BufferedReader(new InputStreamReader(zis))
          Stream.continually(br.readLine()).takeWhile(_ != null)
        }
    }
    spark.read.schema(scoreFileSchema).json(spark.createDataset(dataRDD)(Encoders.STRING))
  }

 

登录后即可回复 登录 | 注册
    
关注编程学问公众号