运行spark出现下面异常
21/04/25 09:28:55 ERROR logging.DriverLogger$DfsAsyncWriter: Failed writing driver logs to dfs
java.nio.channels.ClosedChannelException
at org.apache.hadoop.hdfs.ExceptionLastSeen.throwException4Close(ExceptionLastSeen.java:73)
at org.apache.hadoop.hdfs.DFSOutputStream.checkClosed(DFSOutputStream.java:153)
at org.apache.hadoop.fs.FSOutputSummer.write(FSOutputSummer.java:105)
at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:57)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at org.apache.spark.util.logging.DriverLogger$DfsAsyncWriter.run(DriverLogger.scala:136)
at org.apache.spark.util.logging.DriverLogger$DfsAsyncWriter.org$apache$spark$util$logging$DriverLogger$DfsAsyncWriter$$close(DriverLogger.scala:151)
at org.apache.spark.util.logging.DriverLogger$DfsAsyncWriter$$anon$1.run(DriverLogger.scala:167)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
21/04/25 09:28:55 ERROR scheduler.AsyncEventQueue: Listener EventLoggingListener threw an exception
java.io.IOException: Filesystem closed
at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:469)
at org.apache.hadoop.hdfs.DFSOutputStream.flushOrSync(DFSOutputStream.java:622)
at org.apache.hadoop.hdfs.DFSOutputStream.hflush(DFSOutputStream.java:578)
at org.apache.hadoop.fs.FSDataOutputStream.hflush(FSDataOutputStream.java:134)
at org.apache.spark.scheduler.EventLoggingListener$$anonfun$org$apache$spark$scheduler$EventLoggingListener$$logEvent$3.apply(EventLoggingListener.scala:154)
at org.apache.spark.scheduler.EventLoggingListener$$anonfun$org$apache$spark$scheduler$EventLoggingListener$$logEvent$3.apply(EventLoggingListener.scala:154)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.EventLoggingListener.org$apache$spark$scheduler$EventLoggingListener$$logEvent(EventLoggingListener.scala:154)
at org.apache.spark.scheduler.EventLoggingListener.onApplicationEnd(EventLoggingListener.scala:227)
at org.apache.spark.scheduler.SparkListenerBus$class.doPostEvent(SparkListenerBus.scala:57)
at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:91)
at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$super$postToAll(AsyncEventQueue.scala:92)
at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply$mcJ$sp(AsyncEventQueue.scala:92)
at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87)
at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:87)
at org.apache.spark.scheduler.AsyncEventQueue$$anon$1$$anonfun$run$1.apply$mcV$sp(AsyncEventQueue.scala:83)
at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1350)
at org.apache.spark.scheduler.AsyncEventQueue$$anon$1.run(AsyncEventQueue.scala:82)
字面意思就是hdfs 得filesystem关闭了,我现在写文件写不进去了
找到一段代码,确实有个地方读取文件后关闭了filesystem
def readFromHdfs(path: String): String = {
var fs: FileSystem = null
var is: InputStream = null
try {
fs = FileSystem.get(URI.create(path), new Configuration)
is = fs.open(new Path(path))
IOUtils.toString(is, StandardCharsets.UTF_8)
} finally {
CloseableUtils.closeQuietly(is)
CloseableUtils.closeQuietly(fs)
}
}
我自己创建的,为什么自己不能关闭,但是这里是get,查看源码
get方法对应的源码
public static FileSystem get(URI uri, Configuration conf) throws IOException {
String scheme = uri.getScheme();
String authority = uri.getAuthority();
if (scheme == null && authority == null) { // use default FS
return get(conf);
}
if (scheme != null && authority == null) { // no authority
URI defaultUri = getDefaultUri(conf);
if (scheme.equals(defaultUri.getScheme()) // if scheme matches default
&& defaultUri.getAuthority() != null) { // & default has authority
return get(defaultUri, conf); // return default
}
}
String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme);
if (conf.getBoolean(disableCacheName, false)) {
return createFileSystem(uri, conf);
}
return CACHE.get(uri, conf);
}
是在cache里面取得,这个cache就是一个简单得map,那我不关闭就是了
而且我把写法改成直接获取环境中得hdfs配置
val hdfs=FileSystem.get(Env.SPARK.sparkContext.hadoopConfiguration)