解决spark shell 写入hbase 异常Job in state DEFINE instead of RUNNING

hbase | 2019-09-13 10:02:39

1.异常出现

我在spark shell测试把dataset数据写入到hbase遇到异常Job in state DEFINE instead of RUNNING

我的代码如下

val data=dataset.rdd.mapPartitions(par=>{
  par.map(row=>{
val rowkey=row.getAs[Long]("id")
val put = new Put(Bytes.toBytes(rowkey))
put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("value"), Bytes.toBytes(row.mkString))
(new ImmutableBytesWritable,put)
  })
})
val hbaseConf = HBaseConfiguration.create()
hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, "master:2181")
val hbaseConn = ConnectionFactory.createConnection(hbaseConf)
val jobConf = new JobConf(hbaseConf, this.getClass)
jobConf.set(TableOutputFormat.OUTPUT_TABLE, "hb")
val desc = new HTableDescriptor(TableName.valueOf("hb"))
val hcd = new HColumnDescriptor("cf")
desc.addFamily(hcd)
val admin = hbaseConn.getAdmin
admin.createTable(desc)
val job = Job.getInstance(jobConf)
job.setOutputKeyClass(classOf[ImmutableBytesWritable])
job.setOutputValueClass(classOf[Result])
job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
data.saveAsNewAPIHadoopDataset(job.getConfiguration)


在spark-shell中执行出现下面异常


scala>     val job = Job.getInstance(jobConf)
java.lang.IllegalStateException: Job in state DEFINE instead of RUNNING
  at org.apache.hadoop.mapreduce.Job.ensureState(Job.java:292)
  at org.apache.hadoop.mapreduce.Job.toString(Job.java:457)
  at scala.runtime.ScalaRunTime$.scala$runtime$ScalaRunTime$$inner$1(ScalaRunTime.scala:332)
  at scala.runtime.ScalaRunTime$.stringOf(ScalaRunTime.scala:337)
  at scala.runtime.ScalaRunTime$.replStringOf(ScalaRunTime.scala:345)
  at .$print$lzycompute(<console>:10)
  at .$print(<console>:6)
  at $print(<console>)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:498)
  at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:786)
  at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1047)
  at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:638)
  at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:637)
  at scala.reflect.internal.util.ScalaClassLoader$class.asContext(ScalaClassLoader.scala:31)
  at scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:19)
  at scala.tools.nsc.interpreter.IMain$WrappedRequest.loadAndRunReq(IMain.scala:637)
  at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:569)
  at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:565)
  at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:807)
  at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:681)
  at scala.tools.nsc.interpreter.ILoop.processLine(ILoop.scala:395)
  at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:415)
  at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:923)
  at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:909)
  at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:909)
  at scala.reflect.internal.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:97)
  at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:909)
  at org.apache.spark.repl.Main$.doMain(Main.scala:76)
  at org.apache.spark.repl.Main$.main(Main.scala:56)
  at org.apache.spark.repl.Main.main(Main.scala)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:498)
  at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
  at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:879)
  at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:197)
  at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:227)
  at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:136)
  at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)


2.异常分析

从异常信息来看,就是job状态不对,

在Spark shell模式下,每运行一行代码其都会输出这个对象,所以在初始化job的时候会调用其toString方法来打印出这个对象;但是在toString方法的实现里面会对其状态进行检查,确保job实例是JobState.RUNNING状态,但是这个时候job的状态是JobState.DEFINE,所以会导致异常。


3.解决方法


解决方法一:

不要再spark-shell中执行上面代码,使用spark-submit来提交执行代码,这样就不会检查状态


解决方法二:

使用lazy来初始化定义对象,这样会只有job对象被真正使用的时候才会初始化

lazy val job = Job.getInstance(sc.hadoopConfiguration)lazy val job = Job.getInstance(jobConf)


解决方法三:

将Job对象封装到类里面,这样就不会调用Job的toString方法,这样就可以避免出现异常

class JobWrapper(sc:SparkContext){ val job = Job.getInstance(sc.hadoopConfiguration); }
val jobWrapper = new JobWrapper(sc)
FileInputFormat.setInputPaths(jobWrapper.job, paths)


登录后即可回复 登录 | 注册
    
关注编程学问公众号