Spark 程序执行慢卡住之shuffle优化

spark | 2019-09-13 12:08:22

1.背景

  • 现象
    用户在使用Spark时候,任务正常40min结束。偶现driver卡住,UI上没有新task产生,且2小时候后程序failed。由于客户愿意无法获取到driver详细日志。

  • 基本信息 :

组件 版本 模块
Spark 1.5.1 dataframe.write.mode(SaveMode.Overwrite).parquet(path)
  • 核心业务代码:
dataFrame.write.mode(SaveMode.Overwrite).parquet(path)
sqlContext.sql("load data inpath '"+path+"'+ overwrite into table "+ tableName)
  • hang着的drvier堆栈:
    at org.apache.spark.scheduler.JobWaiter.awaitResult(JobWaiter.scala:73)

2.分析

  • 分析对应的application日志

发现在2019-01-16 15:53:11,748:

 WARN  | [driver-heartbeater] | Error sending message [message = Heartbeat(28,[Lscala.Tuple2;@2043010d,BlockManagerId(28, 10.180.181.196, 23640))] in 2 attempts | org.apache.spark.Logging$class.logWarning(Logging.scala:92)

最终executor和driver的心跳失效:

2019-01-16 15:53:20,101 | ERROR | [dispatcher-event-loop-43] | Driver 10.180.143.218:23214 disassociated! Shutting down. | org.apache.spark.Logging$class.logError(Logging.scala:75)

此外还有大量shuffle异常日志:

2019-01-16 14:12:01,709 | WARN  | [shuffle-server-0] | Exception in connection from /10.180.181.153:49936 | org.apache.spark.network.server.TransportChannelHandler.exceptionCaught(TransportChannelHandler.java:79)
io.netty.handler.codec.CorruptedFrameException: negative pre-adjustment length field: -6101251590885211657
        at io.netty.handler.codec.LengthFieldBasedFrameDecoder.decode(LengthFieldBasedFrameDecoder.java:377)
        at io.netty.handler.codec.LengthFieldBasedFrameDecoder.decode(LengthFieldBasedFrameDecoder.java:343)
        at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:327)
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:230)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
        at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
        at java.lang.Thread.run(Thread.java:748)
2019-01-16 14:12:01,710 | INFO  | [shuffle-server-0] | DIGEST41:Unmatched MACs | com.sun.security.sasl.digest.DigestMD5Base$DigestPrivacy.unwrap(DigestMD5Base.java:1481)
2019-01-16 14:12:01,710 | WARN  | [shuffle-server-0] | Exception in connection from /10.180.181.153:49936 | org.apache.spark.network.server.TransportChannelHandler.exceptionCaught(TransportChannelHandler.java:79)
io.netty.handler.codec.DecoderException: javax.security.sasl.SaslException: DIGEST-MD5: Out of order sequencing of messages from server. Got: 505 Expected: 501
        at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:99)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
        at io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:300)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:208)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:194)
        at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:828)
        at io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:621)
        at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
        at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
        at java.lang.Thread.run(Thread.java:748)
Caused by: javax.security.sasl.SaslException: DIGEST-MD5: Out of order sequencing of messages from server. Got: 505 Expected: 501
        at com.sun.security.sasl.digest.DigestMD5Base$DigestPrivacy.unwrap(DigestMD5Base.java:1489)
        at com.sun.security.sasl.digest.DigestMD5Base.unwrap(DigestMD5Base.java:213)
        at org.apache.spark.network.sasl.SparkSaslServer.unwrap(SparkSaslServer.java:149)
        at org.apache.spark.network.sasl.SaslEncryption$DecryptionHandler.decode(SaslEncryption.java:127)
        at org.apache.spark.network.sasl.SaslEncryption$DecryptionHandler.decode(SaslEncryption.java:102)
        at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:89)

shuffle异常是否也是失败的根因?

 

  • 多分析几个节点的executor日志
    1.节点:HQCDDSJAPP181008_26009,从14:13:17,33515:13:17,292间executor无任何日志打印

  • 进一步分析driver日志

1.由于无法获取到driver日志,没法做更多的分析。先排除推测机制的干扰。让客户关闭掉spark推测机制:spark.speculation

2.关闭掉推测机制后,任务运行也失败了。启动executor失败的次数达到上限

Final app status: FAILED, exitCode: 11, (reason: Max number of executor failures (150) reached) 

并且失败都有大量的socket异常打印,executor和driver网络通信中断:

Caused by: java.io.IOException: Connection from 10.180.143.218/10.180.143.218:23781 closed
        at org.apache.spark.network.client.TransportResponseHandler.channelUnregistered(TransportResponseHandler.java:104)
        at org.apache.spark.network.server.TransportChannelHandler.channelUnregistered(TransportChannelHandler.java:94)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
        at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)

还是怀疑是网络的问题

  • 2019-01-21重新收集到了日志。

1.分析AM日志,发现AM日志从15点到之后都没有任何打印:

2019-01-21 15:07:46,499 | INFO  | [Reporter] | Received new token for : HQCDDSJAPP181113:26009 | org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.populateNMTokens(AMRMClientImpl.java:372)
2019-01-21 15:07:46,499 | INFO  | [Reporter] | Received new token for : HQCDDSJAPP181132:26009 | org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.populateNMTokens(AMRMClientImpl.java:372)
2019-01-21 15:07:46,499 | INFO  | [Reporter] | Received new token for : HQCDDSJAPP181077:26009 | org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.populateNMTokens(AMRMClientImpl.java:372)
2019-01-21 15:07:46,499 | INFO  | [Reporter] | Received new token for : HQCDDSJAPP181045:26009 | org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.populateNMTokens(AMRMClientImpl.java:372)
2019-01-21 15:07:46,499 | INFO  | [Reporter] | Received new token for : HQCDDSJAPP181080:26009 | org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.populateNMTokens(AMRMClientImpl.java:372)
2019-01-21 15:07:46,499 | INFO  | [Reporter] | Received new token for : HQCDDSJAPP181140:26009 | org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.populateNMTokens(AMRMClientImpl.java:372)
2019-01-21 15:07:46,499 | INFO  | [Reporter] | Received new token for : HQCDDSJAPP181164:26009 | org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.populateNMTokens(AMRMClientImpl.java:372)
2019-01-21 15:07:46,499 | INFO  | [Reporter] | Received new token for : HQCDDSJAPP181103:26009 | org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.populateNMTokens(AMRMClientImpl.java:372)
2019-01-21 15:07:46,499 | INFO  | [Reporter] | Received new token for : HQCDDSJAPP181104:26009 | org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.populateNMTokens(AMRMClientImpl.java:372)
2019-01-21 15:07:46,499 | INFO  | [Reporter] | Received new token for : HQCDDSJAPP181033:26009 | org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.populateNMTokens(AMRMClientImpl.java:372)
2019-01-21 15:07:46,499 | INFO  | [Reporter] | Received new token for : HQCDDSJAPP181109:26009 | org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.populateNMTokens(AMRMClientImpl.java:372)
2019-01-21 15:07:46,499 | INFO  | [Reporter] | Received new token for : HQCDDSJAPP181046:26009 | org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.populateNMTokens(AMRMClientImpl.java:372)
2019-01-21 15:07:46,500 | INFO  | [Reporter] | Received new token for : HQCDDSJAPP181182:26009 | org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.populateNMTokens(AMRMClientImpl.java:372)
2019-01-21 15:07:46,505 | INFO  | [Reporter] | Received 29 containers from YARN, launching executors on 0 of them. | org.apache.spark.Logging$class.logInfo(Logging.scala:59)
2019-01-21 17:25:56,565 | INFO  | [dispatcher-event-loop-2] | Driver terminated or disconnected! Shutting down. 10.180.143.218:23073 | org.apache.spark.Logging$class.logInfo(Logging.scala:59)
2019-01-21 17:25:56,567 | INFO  | [dispatcher-event-loop-2] | Unregistering ApplicationMaster with FAILED (diag message: Driver has been dead, no need to retry the AM.) | org.apache.spark.Logging$class.logInfo(Logging.scala:59)
2019-01-21 17:25:56,573 | INFO  | [dispatcher-event-loop-2] | Waiting for application to be successfully unregistered. | org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.unregisterApplicationMaster(AMRMClientImpl.java:394)
2019-01-21 17:25:56,675 | INFO  | [dispatcher-event-loop-2] | Deleting staging directory .sparkStaging/application_1547839891509_15295 | org.apache.spark.Logging$class.logInfo(Logging.scala:59)
2019-01-21 17:25:58,197 | INFO  | [pool-1-thread-1] | Shutdown hook called | org.apache.spark.Logging$class.logInfo(Logging.scala:59)

发现TID 13203 执行了40多min,在stage 31。虽然stage31的某个task执行慢,但是最终是执行成功的。spark的shuffle对网络性能要求比较高,准备进行如下shuffle调优,避免单个task问题:
准备进行如下调整:调优化shuffle参数:

spark.shuffle.file.buffer=64k
spark.reducer.maxSizeInFlight=96M
spark.network.timeout=300s
spark.rpc.askTimeout=300s
spark.shuffle.io.serverThreads=8

结论

1.部分task执行慢,是由于shuffle性能影响,调整shuffle参数规避。

spark.shuffle.file.buffer=64k
spark.reducer.maxSizeInFlight=96M
spark.network.timeout=300s
spark.rpc.askTimeout=300s
spark.shuffle.io.serverThreads=8


2.不排除网络问题的影响,试图调整os参数,但是客户生产

net.ipv4.tcp_keepalive_time= 15
net.ipv4.tcp_keepalive_probes = 10
net.ipv4.tcp_keepalive_intvl= 30


3.关闭sasl

spark.authenticate.enableSaslEncryption=false
spark.authenticate=false

 

登录后即可回复 登录 | 注册
    
关注编程学问公众号