spark streaming 并发-实现多线程并行处理任务

spark | 2019-09-13 10:02:39

场景:

程序需要处理的相似job数随着业务的增长越来越多  

我们知道spark的api里无相互依赖的stage是并行处理的,但是job之间是串行处理的。

spark程序通常是离线处理,比如T+1之类的延迟,时间变长是可以容忍的。而spark streaming是准实时的,如果业务增长导致延迟增加就很不合理。


例子:

目前在做的反作弊规则引擎,离线的处理程序是spark,在线的规则引擎是spark streaming。

离线的问题不大,在线规则引擎如果因为规则越来越多,每次batch处理的时间越来越长是不合理的。


方案:

spark虽然是串行执行job,但是是可以把job放到线程池里多线程执行的。

而spark streaming同样可以。


代码:

DStream.foreachRDD{
      rdd =>
        //创建线程池
        val executors=Executors.newFixedThreadPool(rules.length)
        //将规则放入线程池
        for( ru <- rules){
          val task= executors.submit(new Callable[String] {
            override def call(): String ={
              //执行规则
              runRule(ru,spark)
            }
          })
        }
        //每次创建的线程池执行完所有规则后shutdown
        executors.shutdown()
    }


注意:

1.最后需要executors.shutdown()。

如果是executors.shutdownNow()会发生未执行完的task强制关闭线程。

如果使用executors.awaitTermination()则会发生阻塞,不是我们想要的结果。

如果没有这个shutdowm操作,程序会正常执行,但是长时间会产生大量无用的线程池,因为每次foreachRDD都会创建一个线程池。


2.可不可以将创建线程池放到foreachRDD外面?  

不可以,这个关系到对于scala闭包到理解,经测试,第一次或者前几次batch是正常的,后面的batch无线程可用。


3.线程池executor崩溃了就会导致数据丢失  

原则上是这样的,但是正常的代码一般不会发生executor崩溃。至少我在使用的时候没遇到过。


结果

对于目前的规则引擎项目,使用了多线程并发提交Job的操作,可以在毫秒级处理多数据源大吞吐量且存在数据量波动的数据流。并且在资源充足的情况下,可以随意横向拓展业务且不增加延迟时间。



登录后即可回复 登录 | 注册
    
关注编程学问公众号