场景:
程序需要处理的相似job数随着业务的增长越来越多
我们知道spark的api里无相互依赖的stage是并行处理的,但是job之间是串行处理的。
spark程序通常是离线处理,比如T+1之类的延迟,时间变长是可以容忍的。而spark streaming是准实时的,如果业务增长导致延迟增加就很不合理。
例子:
目前在做的反作弊规则引擎,离线的处理程序是spark,在线的规则引擎是spark streaming。
离线的问题不大,在线规则引擎如果因为规则越来越多,每次batch处理的时间越来越长是不合理的。
方案:
spark虽然是串行执行job,但是是可以把job放到线程池里多线程执行的。
而spark streaming同样可以。
代码:
DStream.foreachRDD{ rdd => //创建线程池 val executors=Executors.newFixedThreadPool(rules.length) //将规则放入线程池 for( ru <- rules){ val task= executors.submit(new Callable[String] { override def call(): String ={ //执行规则 runRule(ru,spark) } }) } //每次创建的线程池执行完所有规则后shutdown executors.shutdown() }
注意:
1.最后需要executors.shutdown()。
如果是executors.shutdownNow()会发生未执行完的task强制关闭线程。
如果使用executors.awaitTermination()则会发生阻塞,不是我们想要的结果。
如果没有这个shutdowm操作,程序会正常执行,但是长时间会产生大量无用的线程池,因为每次foreachRDD都会创建一个线程池。
2.可不可以将创建线程池放到foreachRDD外面?
不可以,这个关系到对于scala闭包到理解,经测试,第一次或者前几次batch是正常的,后面的batch无线程可用。
3.线程池executor崩溃了就会导致数据丢失
原则上是这样的,但是正常的代码一般不会发生executor崩溃。至少我在使用的时候没遇到过。
结果
对于目前的规则引擎项目,使用了多线程并发提交Job的操作,可以在毫秒级处理多数据源大吞吐量且存在数据量波动的数据流。并且在资源充足的情况下,可以随意横向拓展业务且不增加延迟时间。