1.异常出现
我在spark shell测试把dataset数据写入到hbase遇到异常Job in state DEFINE instead of RUNNING
我的代码如下
val data=dataset.rdd.mapPartitions(par=>{ par.map(row=>{ val rowkey=row.getAs[Long]("id") val put = new Put(Bytes.toBytes(rowkey)) put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("value"), Bytes.toBytes(row.mkString)) (new ImmutableBytesWritable,put) }) }) val hbaseConf = HBaseConfiguration.create() hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, "master:2181") val hbaseConn = ConnectionFactory.createConnection(hbaseConf) val jobConf = new JobConf(hbaseConf, this.getClass) jobConf.set(TableOutputFormat.OUTPUT_TABLE, "hb") val desc = new HTableDescriptor(TableName.valueOf("hb")) val hcd = new HColumnDescriptor("cf") desc.addFamily(hcd) val admin = hbaseConn.getAdmin admin.createTable(desc) val job = Job.getInstance(jobConf) job.setOutputKeyClass(classOf[ImmutableBytesWritable]) job.setOutputValueClass(classOf[Result]) job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]]) data.saveAsNewAPIHadoopDataset(job.getConfiguration)
在spark-shell中执行出现下面异常
scala> val job = Job.getInstance(jobConf) java.lang.IllegalStateException: Job in state DEFINE instead of RUNNING at org.apache.hadoop.mapreduce.Job.ensureState(Job.java:292) at org.apache.hadoop.mapreduce.Job.toString(Job.java:457) at scala.runtime.ScalaRunTime$.scala$runtime$ScalaRunTime$$inner$1(ScalaRunTime.scala:332) at scala.runtime.ScalaRunTime$.stringOf(ScalaRunTime.scala:337) at scala.runtime.ScalaRunTime$.replStringOf(ScalaRunTime.scala:345) at .$print$lzycompute(<console>:10) at .$print(<console>:6) at $print(<console>) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:786) at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1047) at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:638) at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:637) at scala.reflect.internal.util.ScalaClassLoader$class.asContext(ScalaClassLoader.scala:31) at scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:19) at scala.tools.nsc.interpreter.IMain$WrappedRequest.loadAndRunReq(IMain.scala:637) at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:569) at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:565) at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:807) at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:681) at scala.tools.nsc.interpreter.ILoop.processLine(ILoop.scala:395) at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:415) at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:923) at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:909) at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:909) at scala.reflect.internal.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:97) at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:909) at org.apache.spark.repl.Main$.doMain(Main.scala:76) at org.apache.spark.repl.Main$.main(Main.scala:56) at org.apache.spark.repl.Main.main(Main.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:879) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:197) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:227) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:136) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
2.异常分析
从异常信息来看,就是job状态不对,
在Spark shell模式下,每运行一行代码其都会输出这个对象,所以在初始化job的时候会调用其toString方法来打印出这个对象;但是在toString方法的实现里面会对其状态进行检查,确保job实例是JobState.RUNNING状态,但是这个时候job的状态是JobState.DEFINE,所以会导致异常。
3.解决方法
解决方法一:
不要再spark-shell中执行上面代码,使用spark-submit来提交执行代码,这样就不会检查状态
解决方法二:
使用lazy来初始化定义对象,这样会只有job对象被真正使用的时候才会初始化
lazy val job = Job.getInstance(sc.hadoopConfiguration)lazy val job = Job.getInstance(jobConf)
解决方法三:
将Job对象封装到类里面,这样就不会调用Job的toString方法,这样就可以避免出现异常
class JobWrapper(sc:SparkContext){ val job = Job.getInstance(sc.hadoopConfiguration); } val jobWrapper = new JobWrapper(sc) FileInputFormat.setInputPaths(jobWrapper.job, paths)