spark多线程并发执行job

java | 2021-04-23 10:45:53

spark中具体的action算子对应一个job,通常无论rdd依赖关系如何,我们都是串行执行job,我们也可以把没有相互依赖关系的计算,使用多线程并发计算,这样速度快很多,就像一个stage的时间是取决于最慢的task,并发后spark的时间就不是所有job时间加起来,而是最慢的job的时间。

使用java的多线程类库的方式:

/***公共计算代码***/


    /***并发独立计算代码***/

    //创建线程池
    val executors = Executors.newFixedThreadPool(20)

    //计算业务1
    val task1 = executors.submit(new Callable[String] {
      override def call(): String = {
        //计算代码
        "业务1计算完成"
      }
    })

    //计算业务2
    val task2 = executors.submit(new Callable[String] {
      override def call(): String = {
        //计算代码
        "业务2计算完成"
      }
    })


    executors.awaitTermination(1, TimeUnit.HOURS)
    print(task1.get()+task2.get())

上面这种方式很java,使用的也是java的类库,这完全没有问题,因为 spark driver上面执行的 job是和普通java程序没多大区别,知识driver会把具体计算交给executor,并保持通信。

 

scala多线程:

import org.apache.spark.sql.SparkSession
import import java.util.concurrent.Executors
import scala.concurrent._
import scala.concurrent.duration._

object FancyApp {
  def def appMain(args: Array[String]) = {
    // configure spark
    val spark = SparkSession
        .builder
        .appName("parjobs")
        .getOrCreate()

    // Set number of threads via a configuration property
    val pool = Executors.newFixedThreadPool(5)
    // create the implicit ExecutionContext based on our thread pool
    implicit val xc = ExecutionContext.fromExecutorService(pool)
    val df = spark.sparkContext.parallelize(1 to 100).toDF
    val taskA = doFancyDistinct(df, "hdfs:///dis.parquet")
    val taskB = doFancySum(df, "hdfs:///sum.parquet")
    // Now wait for the tasks to finish before exiting the app
    Await.result(Future.sequence(Seq(taskA,taskB)), Duration(1, MINUTES))
  }

  def doFancyDistinct(df: DataFrame, outPath: String)(implicit xc: ExecutionContext) = Future {
    df.distinct.write.parquet(outPath)
  }

  def doFancySum(df: DataFrame, outPath: String)(implicit xc: ExecutionContext) = Future {
    df.agg(sum("value")).write.parquet(outPath) 
  }
}

 

登录后即可回复 登录 | 注册
    
关注编程学问公众号