spark读取mysql再计算,很简单,因为mysql是个二维表,读取后直接用sql查询计算就可以。但mongodb是树形的json文档接口,读取之后需要解析成二维关系表才能进行计算分析。
spark读取mongodb其实很简单,使用mongoDB官方的mongo-spark-connector。下面的案例展示读取和拆分解析mongoDB数据。并且给出了4种案例方法。
参考:mongo-spark-connector官方文档:https://docs.mongodb.com/spark-connector/current/
1.mongoDB 数据结构
mongoDB集合如下,最上层是学校,每个学校下面包含班级信息
A学校ID A学校名称 A学校地址
------------1班ID 1班名称 1班年级
------------2班ID 2班名称 2班年级
B学校ID B学校名称 B学校地址
------------1班ID 1班名称 1班年级
------------2班ID 2班名称 2班年级
2.spark读取mongoDB依赖包
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>net.itxw</groupId>
<artifactId>spark-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>spark-demo</name>
<description>Demo project for spark</description>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<scala.version>2.11.12</scala.version>
<spark.version>2.3.0</spark.version>
<hadoop.version>2.7.7</hadoop.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.22</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.6</version>
</dependency>
<dependency>
<groupId>org.mongodb.spark</groupId>
<artifactId>mongo-spark-connector_2.11</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongo-java-driver</artifactId>
<version>3.8.0</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
<configuration>
<scalaVersion>${scala.version}</scalaVersion>
<args>
<arg>-target:jvm-1.5</arg>
<arg>-dependencyfile</arg>
<arg>${project.build.directory}/.scala_dependencies</arg>
</args>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-eclipse-plugin</artifactId>
<configuration>
<downloadSources>true</downloadSources>
<buildcommands>
<buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand>
</buildcommands>
<additionalProjectnatures>
<projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature>
</additionalProjectnatures>
<classpathContainers>
<classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
<classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer>
</classpathContainers>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.report.tool.datacenter.test.SparkDimRankTest</mainClass>
</transformer>
</transformers>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<reporting>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<configuration>
<scalaVersion>${scala.version}</scalaVersion>
</configuration>
</plugin>
</plugins>
</reporting>
</project>
需要注意的是spark和mongo的包使用scope provided,只在开发时候使用,执行的时候使用线上集群环境的jar就可以。
3.spark读取解析mongoDB数据案例代码
object SparkMongo {
def main(args: Array[String]): Unit = {
try {
//获取参数 开发模式
val isDev = if (args.length >= 1) args(0) == "dev" else false
//初始化spark
val spark = if (isDev) {
System.setProperty("hadoop.home.dir", "D:\\bigdata\\hadoop-2.7.3")
SparkSession.builder().appName("Sparkdemo")
//.config("spark.some.config.option", "some-value")
.config("spark.debug.maxToStringFields", "100")
.master("local[*]")
//.enableHiveSupport()
.getOrCreate()
} else {
SparkSession.builder().appName("spark-demo")
//.config("spark.some.config.option", "some-value")
.enableHiveSupport()
.getOrCreate()
}
//data是数据库名list是集合名
var projectRdd=spark.read.format("com.mongodb.spark.sql")
.option("spark.mongodb.input.uri", "mongodb://192.168.1.19:23000/data.list")
.option("spark.mongodb.input.partitioner", "MongoShardedPartitioner")
.option("spark.mongodb.input.partitionerOptions.shardkey","_id")
.load()
// 方法1
schoolRdd.createOrReplaceTempView("school_list_tmp")
var classRdd=spark.sql("select school schoolId,name schoolName,explode(classs) classs from school_list_tmp")
classRdd.createOrReplaceTempView("class_list_tmp")
classRdd=spark.sql("select schoolId,schoolName,classs.class classId,classs.name className from class_list_tmp")
//方法2
// var classRdd=schoolRdd.select(schoolRdd("school").as("schoolId"),schoolRdd("name").as("schoolName"),explode(schoolRdd("classs")).as("class"))
// classRdd=classRdd.select(classRdd("schoolId"),classRdd("schoolName"),classRdd("class")("class").as("classId"),classRdd("class")("name").as("className"))
//方法3
// var classRdd = schoolRdd.mapPartitions(rows=>{
// rows.map(row=>{
// var classRows=row.getAs[mutable.WrappedArray[Row]]("classs")
// classRows.map(classRow=>{
// (row.getAs[String]("school"),row.getAs[String]("name"),classRow.getAs[String]("class"),classRow.getAs[String]("name"))
// })
// }).flatten
// })(Encoders.tuple(Encoders.STRING,Encoders.STRING,Encoders.STRING,Encoders.STRING)).toDF("schoolId","schoolName","classId","className")
// 方法4
// var schoolRdd = projectRdd.mapPartitions(rows=>{
// rows.flatMap(row=>{
// var schoolRows=row.getAs[mutable.WrappedArray[Row]]("schools")
// schoolRows.map(schoolRow=>{
// (row.getAs[String]("project"),row.getAs[String]("name"),schoolRow.getAs[String]("school"),schoolRow.getAs[String]("name"))
// })
// })
// })(Encoders.tuple(Encoders.STRING,Encoders.STRING,Encoders.STRING,Encoders.STRING)).toDF("projectId","projectName","schoolId","schoolName")
spark.close()
} catch {
case e: Exception => {
Logger.getLogger(SparkMongo.getClass).error("数据计算分析异常", e)
throw e
}
}
}
附:
spark explode: 和php的explode差不多,可以按照子集的行数一行拆分成多行,和flatten是一个意思,把结构化数据弄扁平化。
spark flatMap: flatMap是先flatten 然后再map,先扁平化再map,而方法3是先map再flatten一样能实现效果