spark知识培训记录文档

2020-04-20 14:49:28 | 编辑

Spark

 

1.介绍

1.1批处理

Apache Spark™ is a unified analytics engine for large-scale data processing.(分布式批处理引擎)

spark Streaming 微批处理 流式计算

flink基于事件驱动

 

spark对比mapreduce的区别 基于本地数据加载 基于内存计算

 

2.集群模式

2.1集群模式

spark Standalone

spark on yarn

 

 

2.2 集群搭建

Apache(hadoop+spark)

CDH

Ambari

 

4.概念组件

4.1 组件

Spark Core 底层计算交互,将代码转换成计算任务,完成任务调度

 

Spark sql (jpa代码 sql)

spark Streaming

MLlib 机器学习

GraphX 图形计算

 

4.2术语概念

Worker

Instance

Application

 

 

driver

executor

 

 

rddResilient Distributed Datasets 弹性分布式数据集)(dataset,dataframe)

DAG(Directed Acyclic Graph):有向无环图

 

 

Job (一个action)

Stages

task

 

transformation算子 lazy操作)

action算子

 

 

5.案例

5.1开发环境

安装scala插件

 

 

然后就可以新建scala

 

 

新建一个scala项目选择scala

 

 

下载使用sdk

 

 

 

5.2开发案例(maven

 

scala案例代码



import java.util.Properties

import org.apache.commons.lang.StringUtils
import org.apache.log4j.Logger
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.{Dataset, Row, SaveMode, SparkSession}
import org.apache.spark.sql.functions._

/**
  * spark demo 计算所有学校总得分的排名
  */
//class定义类和java一样要new才能调用,object定义类相当于static 单例 可以直接调用,case class自动构造方法和get set方法
object SparkMain {

  /**
    *
    * @param args 参数0-{取多少条数据}
    */
  def main(args: Array[String]): Unit = {

    try {
      //获取参数 开发模式
      val isDev = if (args.length >= 1) args(0) == "dev" else false

      //初始化spark
      val spark = if (isDev) {
        System.setProperty("hadoop.home.dir", "D:\\bigdata\\hadoop-2.7.3")

        SparkSession.builder().appName("Sparkdemo")
          //.config("spark.some.config.option", "some-value")
          .config("spark.debug.maxToStringFields", "100")
          .master("local[*]")
          .enableHiveSupport()
          .getOrCreate()
      } else {
        SparkSession.builder().appName("spark-demo")
          //.config("spark.some.config.option", "some-value")
          .enableHiveSupport()
          .getOrCreate()
      }

      //设置Checkpoint路径
      spark.sparkContext.setCheckpointDir("hdfs://hadoopMaster:9000/tmp/engine/sparkdemo/")

      //加载mysql数据 到rdd
      var rdd=spark.read.format("jdbc")
        .option("url", "jdbc:mysql://10.10.22.154:3306/datacenter")
        .option("user", "root")
        .option("password","znxunzhi")
        .option("dbtable","subject_total_score")
        .option("partitionColumn", "id")
        .option("lowerBound", "0")
        .option("upperBound", 5000000)
        .option("numPartitions", 1000)
        .load()

      rdd.persist()

      //引入隐式表达式
      import spark.sqlContext.implicits._

      /***计算每个学校的总分的学校排名***/
      //1.先计算总分
      var dataframe=rdd.groupBy("SCHOOL_NAME").agg(sum("TOTAL_SCORE").as("SCHOOL_MAX_SCORE"))

      //如果计算链很长
      //dataframe.persist()
      dataframe.checkpoint()
      //dataframe.show(10)


      //2.计算总分的排名
      dataframe=dataframe.withColumn("SCHOOL_MAX_SCORE_RANK",rank().over(Window.partitionBy().orderBy($"SCHOOL_MAX_SCORE".desc)))

      //保存计算结果
      val connectionProperties: Properties = new Properties
      connectionProperties.put("user", "root")
      connectionProperties.put("password", "znxunzhi")
      connectionProperties.put("createTableOptions", "ENGINE=InnoDB DEFAULT CHARSET=utf8;")
      dataframe.write.mode(SaveMode.Append).jdbc("jdbc:mysql://10.10.22.154:3306/datacenter","sparkdemo1",connectionProperties)


      ///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

      /***计算每个学校的最高分的学校排名 sql 方式***/
      //先计算最高分
      rdd.createOrReplaceTempView("temp_total_score")
      val df1=rdd.sqlContext.sql("select SCHOOL_NAME,sum(TOTAL_SCORE) SCHOOL_MAX_SCORE from temp_total_score group by SCHOOL_NAME")

      df1.createOrReplaceTempView("temp_total_score_max")
      val df2=df1.sqlContext.sql("select SCHOOL_NAME,SCHOOL_MAX_SCORE,rank() over(ORDER BY SCHOOL_MAX_SCORE DESC) as SCHOOL_MAX_SCORE_RANK from temp_total_score_max")

      //保存计算结果
      val connectionProperties1: Properties = new Properties
      connectionProperties1.put("user", "root")
      connectionProperties1.put("password", "znxunzhi")
      connectionProperties1.put("createTableOptions", "ENGINE=InnoDB DEFAULT CHARSET=utf8;")
      df2.write.mode(SaveMode.Append).jdbc("jdbc:mysql://10.10.22.154:3306/datacenter","sparkdemo2",connectionProperties)

      spark.close()
    } catch {
      case e: Exception => {
        Logger.getLogger(SparkMain.getClass).error("数据计算分析异常", e)
        throw e
      }
    }
  }

}

 

java案例代码



import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.expressions.Window;
import org.apache.spark.sql.functions;

import java.util.Properties;

/**
 * @Description:
 * @Author: houyong
 * @Date: 2020/4/8
 */
public class SparkMainJava {
    public static void main(String[] args) {
        try {
            //获取参数 开发模式
            boolean isDev =args.length >= 1&&args[0] == "dev"?true:false;

            //初始化spark
            SparkSession spark = null;
            if (isDev) {
                System.setProperty("hadoop.home.dir", "D:\\bigdata\\hadoop-2.7.3");

                spark=SparkSession.builder().appName("Sparkdemo")
                        //.config("spark.some.config.option", "some-value")
                        .config("spark.debug.maxToStringFields", "100")
                        .master("local[*]")
                        .enableHiveSupport()
                        .getOrCreate();
            } else {
                spark=SparkSession.builder().appName("spark-demo")
                        //.config("spark.some.config.option", "some-value")
                        .enableHiveSupport()
                        .getOrCreate();
            }

            //设置Checkpoint路径
            spark.sparkContext().setCheckpointDir("hdfs://hadoopMaster:9000/tmp/engine/sparkdemo/");

            //加载mysql数据 到rdd
            Dataset rdd=spark.read().format("jdbc")
                    .option("url", "jdbc:mysql://10.10.22.154:3306/datacenter")
                    .option("user", "root")
                    .option("password","znxunzhi")
                    .option("dbtable","subject_total_score")
                    .option("partitionColumn", "id")
                    .option("lowerBound", "0")
                    .option("upperBound", 5000000)
                    .option("numPartitions", 1000)
                    .load();

            rdd.persist();

            /***计算每个学校的最高分的学校排名***/
            //1.先计算总分
            Dataset dataframe=rdd.groupBy("SCHOOL_NAME").agg(functions.sum("TOTAL_SCORE").as("SCHOOL_MAX_SCORE"));

            //如果计算链很长
            //dataframe.persist()
            //dataframe.checkpoint()
            //dataframe.show(10)


            //2.计算总分的排名
            dataframe=dataframe.withColumn("SCHOOL_MAX_SCORE_RANK",functions.rank().over(Window.partitionBy().orderBy(functions.col("SCHOOL_MAX_SCORE").desc())));

            //保存计算结果
            Properties connectionProperties = new Properties();
            connectionProperties.put("user", "root");
            connectionProperties.put("password", "znxunzhi");
            connectionProperties.put("createTableOptions", "ENGINE=InnoDB DEFAULT CHARSET=utf8;");
            dataframe.write().mode(SaveMode.Append).jdbc("jdbc:mysql://10.10.22.154:3306/datacenter","sparkdemo1",connectionProperties);


            ///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

            /***计算每个学校的最高分的学校排名 sql 方式***/
            //先计算最高分
            rdd.createOrReplaceTempView("temp_total_score");
            Dataset df1=rdd.sqlContext().sql("select SCHOOL_NAME,sum(TOTAL_SCORE) SCHOOL_MAX_SCORE from temp_total_score group by SCHOOL_NAME");

            df1.createOrReplaceTempView("temp_total_score_max");
            Dataset df2=df1.sqlContext().sql("select SCHOOL_NAME,SCHOOL_MAX_SCORE,rank() over(ORDER BY SCHOOL_MAX_SCORE DESC) as SCHOOL_MAX_SCORE_RANK from temp_total_score_max");

            //保存计算结果
            Properties connectionProperties1 = new Properties();
            connectionProperties1.put("user", "root");
            connectionProperties1.put("password", "znxunzhi");
            connectionProperties1.put("createTableOptions", "ENGINE=InnoDB DEFAULT CHARSET=utf8;");
            df2.write().mode(SaveMode.Append).jdbc("jdbc:mysql://10.10.22.154:3306/datacenter","sparkdemo2",connectionProperties);

            spark.close();
        } catch(Exception e){
            e.printStackTrace();
        }
    }
}

 

任务提交方式:spark-submit命令,rest

 

hadoop dfs -put spark-demo-0.0.1-SNAPSHOT.jar /tmp/engine

 

spark-submit --name demoTest --master yarn --executor-cores 10 --num-executors 10 --executor-memory 8G --class com.ajia.sparkdemo.SparkMain hdfs://hadoopMaster:9000/tmp/engine/spark-demo-0.0.1-SNAPSHOT.jar pro

 

5.2调试

spark-submit --queue low --driver-java-options "-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=8888"  --name demoTest --master yarn --executor-cores 10 --num-executors 10 --executor-memory 8G --class com.ajia.sparkdemo.SparkMain hdfs://hadoopMaster:9000/tmp/engine/spark-demo-0.0.1-SNAPSHOT.jar pro

 

 

 

 

3.资源队列

3.1. 调度器

    * FIFO:只有一个队列,所有用户共享。资源分配的过程也非常简单,先到先得,所以很容易出现一个用户占满集群所有资源的情况。

    * CapacityScheduler:在FIFO的基础上,增加多用户支持,每个用户都可以使用特定量的资源,但集群空闲时,也可以使用整个集群的资源。

    * FairScheduler:每个用户只有配置最大特定数量的资源可以用,不可能超出这个限制,即使集群整体很空闲。支持多资源类型。队列之间可以借用。

 

 

6.性能

Shuffle

 

数据倾斜

 

Persist(缓存)

 

Checkpoint (容错)

 

登录后即可回复 登录 | 注册
    
关注编程学问公众号