Spark
1.介绍
1.1批处理
Apache Spark™ is a unified analytics engine for large-scale data processing.(分布式批处理引擎)
spark Streaming 微批处理 的 流式计算
flink基于事件驱动
spark对比mapreduce的区别 基于本地数据加载 基于内存计算
2.集群模式
2.1集群模式
spark Standalone
spark on yarn
2.2 集群搭建
Apache(hadoop+spark)
CDH
Ambari
4.概念组件
4.1 组件
Spark Core 底层计算交互,将代码转换成计算任务,完成任务调度
Spark sql (jpa代码 和sql)
spark Streaming
MLlib 机器学习
GraphX 图形计算
4.2术语概念
Worker
Instance
Application
driver
executor
rdd(Resilient Distributed Datasets 弹性分布式数据集)(dataset,dataframe)
DAG(Directed Acyclic Graph):有向无环图
Job (一个action)
Stages
task
transformation算子 (lazy操作)
action算子
5.案例
5.1开发环境
安装scala插件
然后就可以新建scala类
新建一个scala项目选择scala
下载使用sdk
5.2开发案例(maven)
scala案例代码
import java.util.Properties
import org.apache.commons.lang.StringUtils
import org.apache.log4j.Logger
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.{Dataset, Row, SaveMode, SparkSession}
import org.apache.spark.sql.functions._
/**
* spark demo 计算所有学校总得分的排名
*/
//class定义类和java一样要new才能调用,object定义类相当于static 单例 可以直接调用,case class自动构造方法和get set方法
object SparkMain {
/**
*
* @param args 参数0-{取多少条数据}
*/
def main(args: Array[String]): Unit = {
try {
//获取参数 开发模式
val isDev = if (args.length >= 1) args(0) == "dev" else false
//初始化spark
val spark = if (isDev) {
System.setProperty("hadoop.home.dir", "D:\\bigdata\\hadoop-2.7.3")
SparkSession.builder().appName("Sparkdemo")
//.config("spark.some.config.option", "some-value")
.config("spark.debug.maxToStringFields", "100")
.master("local[*]")
.enableHiveSupport()
.getOrCreate()
} else {
SparkSession.builder().appName("spark-demo")
//.config("spark.some.config.option", "some-value")
.enableHiveSupport()
.getOrCreate()
}
//设置Checkpoint路径
spark.sparkContext.setCheckpointDir("hdfs://hadoopMaster:9000/tmp/engine/sparkdemo/")
//加载mysql数据 到rdd
var rdd=spark.read.format("jdbc")
.option("url", "jdbc:mysql://10.10.22.154:3306/datacenter")
.option("user", "root")
.option("password","znxunzhi")
.option("dbtable","subject_total_score")
.option("partitionColumn", "id")
.option("lowerBound", "0")
.option("upperBound", 5000000)
.option("numPartitions", 1000)
.load()
rdd.persist()
//引入隐式表达式
import spark.sqlContext.implicits._
/***计算每个学校的总分的学校排名***/
//1.先计算总分
var dataframe=rdd.groupBy("SCHOOL_NAME").agg(sum("TOTAL_SCORE").as("SCHOOL_MAX_SCORE"))
//如果计算链很长
//dataframe.persist()
dataframe.checkpoint()
//dataframe.show(10)
//2.计算总分的排名
dataframe=dataframe.withColumn("SCHOOL_MAX_SCORE_RANK",rank().over(Window.partitionBy().orderBy($"SCHOOL_MAX_SCORE".desc)))
//保存计算结果
val connectionProperties: Properties = new Properties
connectionProperties.put("user", "root")
connectionProperties.put("password", "znxunzhi")
connectionProperties.put("createTableOptions", "ENGINE=InnoDB DEFAULT CHARSET=utf8;")
dataframe.write.mode(SaveMode.Append).jdbc("jdbc:mysql://10.10.22.154:3306/datacenter","sparkdemo1",connectionProperties)
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
/***计算每个学校的最高分的学校排名 sql 方式***/
//先计算最高分
rdd.createOrReplaceTempView("temp_total_score")
val df1=rdd.sqlContext.sql("select SCHOOL_NAME,sum(TOTAL_SCORE) SCHOOL_MAX_SCORE from temp_total_score group by SCHOOL_NAME")
df1.createOrReplaceTempView("temp_total_score_max")
val df2=df1.sqlContext.sql("select SCHOOL_NAME,SCHOOL_MAX_SCORE,rank() over(ORDER BY SCHOOL_MAX_SCORE DESC) as SCHOOL_MAX_SCORE_RANK from temp_total_score_max")
//保存计算结果
val connectionProperties1: Properties = new Properties
connectionProperties1.put("user", "root")
connectionProperties1.put("password", "znxunzhi")
connectionProperties1.put("createTableOptions", "ENGINE=InnoDB DEFAULT CHARSET=utf8;")
df2.write.mode(SaveMode.Append).jdbc("jdbc:mysql://10.10.22.154:3306/datacenter","sparkdemo2",connectionProperties)
spark.close()
} catch {
case e: Exception => {
Logger.getLogger(SparkMain.getClass).error("数据计算分析异常", e)
throw e
}
}
}
}
java案例代码
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.expressions.Window;
import org.apache.spark.sql.functions;
import java.util.Properties;
/**
* @Description:
* @Author: houyong
* @Date: 2020/4/8
*/
public class SparkMainJava {
public static void main(String[] args) {
try {
//获取参数 开发模式
boolean isDev =args.length >= 1&&args[0] == "dev"?true:false;
//初始化spark
SparkSession spark = null;
if (isDev) {
System.setProperty("hadoop.home.dir", "D:\\bigdata\\hadoop-2.7.3");
spark=SparkSession.builder().appName("Sparkdemo")
//.config("spark.some.config.option", "some-value")
.config("spark.debug.maxToStringFields", "100")
.master("local[*]")
.enableHiveSupport()
.getOrCreate();
} else {
spark=SparkSession.builder().appName("spark-demo")
//.config("spark.some.config.option", "some-value")
.enableHiveSupport()
.getOrCreate();
}
//设置Checkpoint路径
spark.sparkContext().setCheckpointDir("hdfs://hadoopMaster:9000/tmp/engine/sparkdemo/");
//加载mysql数据 到rdd
Dataset rdd=spark.read().format("jdbc")
.option("url", "jdbc:mysql://10.10.22.154:3306/datacenter")
.option("user", "root")
.option("password","znxunzhi")
.option("dbtable","subject_total_score")
.option("partitionColumn", "id")
.option("lowerBound", "0")
.option("upperBound", 5000000)
.option("numPartitions", 1000)
.load();
rdd.persist();
/***计算每个学校的最高分的学校排名***/
//1.先计算总分
Dataset dataframe=rdd.groupBy("SCHOOL_NAME").agg(functions.sum("TOTAL_SCORE").as("SCHOOL_MAX_SCORE"));
//如果计算链很长
//dataframe.persist()
//dataframe.checkpoint()
//dataframe.show(10)
//2.计算总分的排名
dataframe=dataframe.withColumn("SCHOOL_MAX_SCORE_RANK",functions.rank().over(Window.partitionBy().orderBy(functions.col("SCHOOL_MAX_SCORE").desc())));
//保存计算结果
Properties connectionProperties = new Properties();
connectionProperties.put("user", "root");
connectionProperties.put("password", "znxunzhi");
connectionProperties.put("createTableOptions", "ENGINE=InnoDB DEFAULT CHARSET=utf8;");
dataframe.write().mode(SaveMode.Append).jdbc("jdbc:mysql://10.10.22.154:3306/datacenter","sparkdemo1",connectionProperties);
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
/***计算每个学校的最高分的学校排名 sql 方式***/
//先计算最高分
rdd.createOrReplaceTempView("temp_total_score");
Dataset df1=rdd.sqlContext().sql("select SCHOOL_NAME,sum(TOTAL_SCORE) SCHOOL_MAX_SCORE from temp_total_score group by SCHOOL_NAME");
df1.createOrReplaceTempView("temp_total_score_max");
Dataset df2=df1.sqlContext().sql("select SCHOOL_NAME,SCHOOL_MAX_SCORE,rank() over(ORDER BY SCHOOL_MAX_SCORE DESC) as SCHOOL_MAX_SCORE_RANK from temp_total_score_max");
//保存计算结果
Properties connectionProperties1 = new Properties();
connectionProperties1.put("user", "root");
connectionProperties1.put("password", "znxunzhi");
connectionProperties1.put("createTableOptions", "ENGINE=InnoDB DEFAULT CHARSET=utf8;");
df2.write().mode(SaveMode.Append).jdbc("jdbc:mysql://10.10.22.154:3306/datacenter","sparkdemo2",connectionProperties);
spark.close();
} catch(Exception e){
e.printStackTrace();
}
}
}
任务提交方式:spark-submit命令,rest
hadoop dfs -put spark-demo-0.0.1-SNAPSHOT.jar /tmp/engine
spark-submit --name demoTest --master yarn --executor-cores 10 --num-executors 10 --executor-memory 8G --class com.ajia.sparkdemo.SparkMain hdfs://hadoopMaster:9000/tmp/engine/spark-demo-0.0.1-SNAPSHOT.jar pro
5.2调试
spark-submit --queue low --driver-java-options "-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=8888" --name demoTest --master yarn --executor-cores 10 --num-executors 10 --executor-memory 8G --class com.ajia.sparkdemo.SparkMain hdfs://hadoopMaster:9000/tmp/engine/spark-demo-0.0.1-SNAPSHOT.jar pro
3.资源队列
3.1. 调度器
* FIFO:只有一个队列,所有用户共享。资源分配的过程也非常简单,先到先得,所以很容易出现一个用户占满集群所有资源的情况。
* CapacityScheduler:在FIFO的基础上,增加多用户支持,每个用户都可以使用特定量的资源,但集群空闲时,也可以使用整个集群的资源。
* FairScheduler:每个用户只有配置最大特定数量的资源可以用,不可能超出这个限制,即使集群整体很空闲。支持多资源类型。队列之间可以借用。
6.性能
Shuffle
数据倾斜
Persist(缓存)
Checkpoint (容错)