spark运行异常Failed writing driver logs to dfs

spark | 2021-04-25 09:35:03

运行spark出现下面异常

21/04/25 09:28:55 ERROR logging.DriverLogger$DfsAsyncWriter: Failed writing driver logs to dfs
java.nio.channels.ClosedChannelException
	at org.apache.hadoop.hdfs.ExceptionLastSeen.throwException4Close(ExceptionLastSeen.java:73)
	at org.apache.hadoop.hdfs.DFSOutputStream.checkClosed(DFSOutputStream.java:153)
	at org.apache.hadoop.fs.FSOutputSummer.write(FSOutputSummer.java:105)
	at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:57)
	at java.io.DataOutputStream.write(DataOutputStream.java:107)
	at org.apache.spark.util.logging.DriverLogger$DfsAsyncWriter.run(DriverLogger.scala:136)
	at org.apache.spark.util.logging.DriverLogger$DfsAsyncWriter.org$apache$spark$util$logging$DriverLogger$DfsAsyncWriter$$close(DriverLogger.scala:151)
	at org.apache.spark.util.logging.DriverLogger$DfsAsyncWriter$$anon$1.run(DriverLogger.scala:167)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
21/04/25 09:28:55 ERROR scheduler.AsyncEventQueue: Listener EventLoggingListener threw an exception
java.io.IOException: Filesystem closed
	at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:469)
	at org.apache.hadoop.hdfs.DFSOutputStream.flushOrSync(DFSOutputStream.java:622)
	at org.apache.hadoop.hdfs.DFSOutputStream.hflush(DFSOutputStream.java:578)
	at org.apache.hadoop.fs.FSDataOutputStream.hflush(FSDataOutputStream.java:134)
	at org.apache.spark.scheduler.EventLoggingListener$$anonfun$org$apache$spark$scheduler$EventLoggingListener$$logEvent$3.apply(EventLoggingListener.scala:154)
	at org.apache.spark.scheduler.EventLoggingListener$$anonfun$org$apache$spark$scheduler$EventLoggingListener$$logEvent$3.apply(EventLoggingListener.scala:154)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.EventLoggingListener.org$apache$spark$scheduler$EventLoggingListener$$logEvent(EventLoggingListener.scala:154)
	at org.apache.spark.scheduler.EventLoggingListener.onApplicationEnd(EventLoggingListener.scala:227)
	at org.apache.spark.scheduler.SparkListenerBus$class.doPostEvent(SparkListenerBus.scala:57)
	at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
	at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
	at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:91)
	at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$super$postToAll(AsyncEventQueue.scala:92)
	at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply$mcJ$sp(AsyncEventQueue.scala:92)
	at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87)
	at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
	at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:87)
	at org.apache.spark.scheduler.AsyncEventQueue$$anon$1$$anonfun$run$1.apply$mcV$sp(AsyncEventQueue.scala:83)
	at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1350)
	at org.apache.spark.scheduler.AsyncEventQueue$$anon$1.run(AsyncEventQueue.scala:82)

字面意思就是hdfs 得filesystem关闭了,我现在写文件写不进去了

找到一段代码,确实有个地方读取文件后关闭了filesystem

def readFromHdfs(path: String): String = {
    var fs: FileSystem = null
    var is: InputStream = null
    try {
      fs = FileSystem.get(URI.create(path), new Configuration)
      is = fs.open(new Path(path))
      IOUtils.toString(is, StandardCharsets.UTF_8)
    } finally {
      CloseableUtils.closeQuietly(is)
      CloseableUtils.closeQuietly(fs)
    }
  }

我自己创建的,为什么自己不能关闭,但是这里是get,查看源码

get方法对应的源码

public static FileSystem get(URI uri, Configuration conf) throws IOException {
    String scheme = uri.getScheme();
    String authority = uri.getAuthority();

    if (scheme == null && authority == null) {     // use default FS
      return get(conf);
    }

    if (scheme != null && authority == null) {     // no authority
      URI defaultUri = getDefaultUri(conf);
      if (scheme.equals(defaultUri.getScheme())    // if scheme matches default
          && defaultUri.getAuthority() != null) {  // & default has authority
        return get(defaultUri, conf);              // return default
      }
    }
    
    String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme);
    if (conf.getBoolean(disableCacheName, false)) {
      return createFileSystem(uri, conf);
    }

    return CACHE.get(uri, conf);
  }

是在cache里面取得,这个cache就是一个简单得map,那我不关闭就是了

而且我把写法改成直接获取环境中得hdfs配置

val hdfs=FileSystem.get(Env.SPARK.sparkContext.hadoopConfiguration)

 

 

登录后即可回复 登录 | 注册
    
关注编程学问公众号