spark读取mongodb解析拆分嵌套数据的几种方式

spark | 2020-07-21 17:02:16

    spark读取mysql再计算,很简单,因为mysql是个二维表,读取后直接用sql查询计算就可以。但mongodb是树形的json文档接口,读取之后需要解析成二维关系表才能进行计算分析。

    spark读取mongodb其实很简单,使用mongoDB官方的mongo-spark-connector。下面的案例展示读取和拆分解析mongoDB数据。并且给出了4种案例方法。

   参考:mongo-spark-connector官方文档:https://docs.mongodb.com/spark-connector/current/

1.mongoDB 数据结构

mongoDB集合如下,最上层是学校,每个学校下面包含班级信息

A学校ID  A学校名称  A学校地址
------------1班ID  1班名称  1班年级
------------2班ID  2班名称  2班年级
B学校ID  B学校名称  B学校地址
------------1班ID  1班名称  1班年级
------------2班ID  2班名称  2班年级

2.spark读取mongoDB依赖包

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>net.itxw</groupId>
    <artifactId>spark-demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>spark-demo</name>
    <description>Demo project for spark</description>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <scala.version>2.11.12</scala.version>
        <spark.version>2.3.0</spark.version>
        <hadoop.version>2.7.7</hadoop.version>
    </properties>

    <dependencies>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.22</version>
        </dependency>


        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.6</version>
        </dependency>

        <dependency>
            <groupId>org.mongodb.spark</groupId>
            <artifactId>mongo-spark-connector_2.11</artifactId>
            <version>2.3.0</version>
        </dependency>

        <dependency>
            <groupId>org.mongodb</groupId>
            <artifactId>mongo-java-driver</artifactId>
            <version>3.8.0</version>
        </dependency>

    </dependencies>

    <build>
        <sourceDirectory>src/main/scala</sourceDirectory>
        <testSourceDirectory>src/test/scala</testSourceDirectory>
        <plugins>
            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <scalaVersion>${scala.version}</scalaVersion>
                    <args>
                        <arg>-target:jvm-1.5</arg>
                        <arg>-dependencyfile</arg>
                        <arg>${project.build.directory}/.scala_dependencies</arg>
                    </args>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-eclipse-plugin</artifactId>
                <configuration>
                    <downloadSources>true</downloadSources>
                    <buildcommands>
                        <buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand>
                    </buildcommands>
                    <additionalProjectnatures>
                        <projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature>
                    </additionalProjectnatures>
                    <classpathContainers>
                        <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
                        <classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer>
                    </classpathContainers>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.0.0</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>com.report.tool.datacenter.test.SparkDimRankTest</mainClass>
                                </transformer>
                            </transformers>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
    <reporting>
        <plugins>
            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <configuration>
                    <scalaVersion>${scala.version}</scalaVersion>
                </configuration>
            </plugin>
        </plugins>
    </reporting>

</project>

需要注意的是spark和mongo的包使用scope provided,只在开发时候使用,执行的时候使用线上集群环境的jar就可以。

3.spark读取解析mongoDB数据案例代码

object SparkMongo {

  def main(args: Array[String]): Unit = {

    try {
      //获取参数 开发模式
      val isDev = if (args.length >= 1) args(0) == "dev" else false

      //初始化spark
      val spark = if (isDev) {
        System.setProperty("hadoop.home.dir", "D:\\bigdata\\hadoop-2.7.3")

        SparkSession.builder().appName("Sparkdemo")
          //.config("spark.some.config.option", "some-value")
          .config("spark.debug.maxToStringFields", "100")
          .master("local[*]")
          //.enableHiveSupport()
          .getOrCreate()
      } else {
        SparkSession.builder().appName("spark-demo")
          //.config("spark.some.config.option", "some-value")
          .enableHiveSupport()
          .getOrCreate()
      }

	  //data是数据库名list是集合名
      var projectRdd=spark.read.format("com.mongodb.spark.sql")
        .option("spark.mongodb.input.uri", "mongodb://192.168.1.19:23000/data.list")
        .option("spark.mongodb.input.partitioner", "MongoShardedPartitioner")
        .option("spark.mongodb.input.partitionerOptions.shardkey","_id")
        .load()


//    方法1
      schoolRdd.createOrReplaceTempView("school_list_tmp")
      var classRdd=spark.sql("select school schoolId,name schoolName,explode(classs) classs from school_list_tmp")
      classRdd.createOrReplaceTempView("class_list_tmp")
      classRdd=spark.sql("select schoolId,schoolName,classs.class classId,classs.name className from class_list_tmp")

      //方法2
//      var classRdd=schoolRdd.select(schoolRdd("school").as("schoolId"),schoolRdd("name").as("schoolName"),explode(schoolRdd("classs")).as("class"))
//      classRdd=classRdd.select(classRdd("schoolId"),classRdd("schoolName"),classRdd("class")("class").as("classId"),classRdd("class")("name").as("className"))

      //方法3
//      var classRdd = schoolRdd.mapPartitions(rows=>{
//        rows.map(row=>{
//          var classRows=row.getAs[mutable.WrappedArray[Row]]("classs")
//          classRows.map(classRow=>{
//            (row.getAs[String]("school"),row.getAs[String]("name"),classRow.getAs[String]("class"),classRow.getAs[String]("name"))
//          })
//        }).flatten
//      })(Encoders.tuple(Encoders.STRING,Encoders.STRING,Encoders.STRING,Encoders.STRING)).toDF("schoolId","schoolName","classId","className")


//      方法4
//        var schoolRdd = projectRdd.mapPartitions(rows=>{
//          rows.flatMap(row=>{
//            var schoolRows=row.getAs[mutable.WrappedArray[Row]]("schools")
//            schoolRows.map(schoolRow=>{
//              (row.getAs[String]("project"),row.getAs[String]("name"),schoolRow.getAs[String]("school"),schoolRow.getAs[String]("name"))
//            })
//          })
//        })(Encoders.tuple(Encoders.STRING,Encoders.STRING,Encoders.STRING,Encoders.STRING)).toDF("projectId","projectName","schoolId","schoolName")

      spark.close()
    } catch {
      case e: Exception => {
        Logger.getLogger(SparkMongo.getClass).error("数据计算分析异常", e)
        throw e
      }
    }
  }

 

附:

spark explode:  和php的explode差不多,可以按照子集的行数一行拆分成多行,和flatten是一个意思,把结构化数据弄扁平化。

spark flatMap:  flatMap是先flatten 然后再map,先扁平化再map,而方法3是先map再flatten一样能实现效果

 

登录后即可回复 登录 | 注册
    
  • houyong
    houyong

    spark read可以指定schema,所以可以先指定字段来只取自己需要的字段,针对mongo也可以配置结构和schema

    val schema = StructType(
            Array(
              StructField("list",
                ArrayType(StructType(Array(
                    StructField("childField1", StringType),StructField("childField2", StringType, true)
                )))
              ),
              
              StructField("studentId", StringType),
              StructField("studentName", StringType)
            )
          )

     

关注编程学问公众号