spark中具体的action算子对应一个job,通常无论rdd依赖关系如何,我们都是串行执行job,我们也可以把没有相互依赖关系的计算,使用多线程并发计算,这样速度快很多,就像一个stage的时间是取决于最慢的task,并发后spark的时间就不是所有job时间加起来,而是最慢的job的时间。
使用java的多线程类库的方式:
/***公共计算代码***/
/***并发独立计算代码***/
//创建线程池
val executors = Executors.newFixedThreadPool(20)
//计算业务1
val task1 = executors.submit(new Callable[String] {
override def call(): String = {
//计算代码
"业务1计算完成"
}
})
//计算业务2
val task2 = executors.submit(new Callable[String] {
override def call(): String = {
//计算代码
"业务2计算完成"
}
})
executors.awaitTermination(1, TimeUnit.HOURS)
print(task1.get()+task2.get())
上面这种方式很java,使用的也是java的类库,这完全没有问题,因为 spark driver上面执行的 job是和普通java程序没多大区别,知识driver会把具体计算交给executor,并保持通信。
scala多线程:
import org.apache.spark.sql.SparkSession
import import java.util.concurrent.Executors
import scala.concurrent._
import scala.concurrent.duration._
object FancyApp {
def def appMain(args: Array[String]) = {
// configure spark
val spark = SparkSession
.builder
.appName("parjobs")
.getOrCreate()
// Set number of threads via a configuration property
val pool = Executors.newFixedThreadPool(5)
// create the implicit ExecutionContext based on our thread pool
implicit val xc = ExecutionContext.fromExecutorService(pool)
val df = spark.sparkContext.parallelize(1 to 100).toDF
val taskA = doFancyDistinct(df, "hdfs:///dis.parquet")
val taskB = doFancySum(df, "hdfs:///sum.parquet")
// Now wait for the tasks to finish before exiting the app
Await.result(Future.sequence(Seq(taskA,taskB)), Duration(1, MINUTES))
}
def doFancyDistinct(df: DataFrame, outPath: String)(implicit xc: ExecutionContext) = Future {
df.distinct.write.parquet(outPath)
}
def doFancySum(df: DataFrame, outPath: String)(implicit xc: ExecutionContext) = Future {
df.agg(sum("value")).write.parquet(outPath)
}
}