使用专用outputformatter保存rdd

0x6upsns  于 2021-05-29  发布在  Hadoop
关注(0)|答案(0)|浏览(186)

我使用的是一个专有数据库,它提供了自己的outputformatter。使用这个outputformatter,我可以编写一个map reduce作业,并将mr中的数据保存到这个数据库中。
不过,我正在尝试使用spark内部的outputformatter,并尝试将rdd保存到数据库中。
我写的代码是

object VerticaSpark extends App {
  val scConf = new SparkConf
  val sc = new SparkContext(scConf)
  val conf = new Configuration()
  val job = new Job(conf)
  job.setInputFormatClass(classOf[VerticaInputFormat])
  job.setOutputKeyClass(classOf[Text])
  job.setOutputValueClass(classOf[VerticaRecord])
  job.setOutputFormatClass(classOf[VerticaOutputFormat])

  VerticaInputFormat.setInput(job, "select * from Foo where key = ?", "1", "2", "3", "4")
  VerticaOutputFormat.setOutput(job, "Bar", true, "name varchar", "total int")
  val rddVR : RDD[VerticaRecord] = sc.newAPIHadoopRDD(job.getConfiguration, classOf[VerticaInputFormat], classOf[LongWritable], classOf[VerticaRecord]).map(_._2)
  val rddTup = rddVR.map(x => (x.get(1).toString(), x.get(2).toString().toInt))
  val rddGroup = rddTup.reduceByKey(_ + _)
  val rddVROutput = rddGroup.map({
    case(x, y) => (new Text("Bar"), getVerticaRecord(x, y, job.getConfiguration))
  })

  //rddVROutput.saveAsNewAPIHadoopFile("Bar", classOf[Text], classOf[VerticaRecord], classOf[VerticaOutputFormat], job.getConfiguration)
  rddVROutput.saveAsNewAPIHadoopDataset(job.getConfiguration)

  def getVerticaRecord(name : String, value : Int , conf: Configuration) : VerticaRecord = {
    var retVal = new VerticaRecord(conf)
    //println(s"going to build Vertica Record with ${name} and ${value}")
    retVal.set(0, new Text(name))
    retVal.set(1, new IntWritable(value))
    retVal
  }
}

我可以从这里下载整个解决方案
https://github.com/abhitechdojo/verticaspark.git
我的代码在 saveAsNewAPIHadoopFile 达到功能。在这一行,它抛出一个nullpointer异常
同样的逻辑和同样的输入输出格式化程序在map-reduce程序中工作得很好,我可以使用mr程序成功地从db中编写
https://my.vertica.com/docs/7.2.x/html/index.htm#authoring/hadoopintegrationguide/hadoopconnector/examplehadoopconnectorapplication.htm%3ftocpath%3将%2520与%2520 Hadoop%7集成,使用%2520将%2520%2520简化为%2520连接器%7c\u7
错误的堆栈跟踪是

16/01/15 16:42:53 WARN TaskSetManager: Lost task 1.0 in stage 1.0 (TID 5, machine): java.lang.NullPointerException
    at com.abhi.VerticaSpark$$anonfun$4.apply(VerticaSpark.scala:39)
    at com.abhi.VerticaSpark$$anonfun$4.apply(VerticaSpark.scala:38)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:999)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:979)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
    at org.apache.spark.scheduler.Task.run(Task.scala:64)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 1.0 failed 4 times, most recent failure: Lost task 1.3 in stage 1.0 (TID 12, machine): java.lang.NullPointerException
    at com.abhi.VerticaSpark$$anonfun$4.apply(VerticaSpark.scala:39)
    at com.abhi.VerticaSpark$$anonfun$4.apply(VerticaSpark.scala:38)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:999)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:979)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
    at org.apache.spark.scheduler.Task.run(Task.scala:64)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1203)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1191)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1191)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
16/01/15 16:42:54 INFO TaskSetManager: Lost task 3.1 in stage 1.0 (TID 11) on executor machine: java.lang.NullPointerException (null) [duplicate 7]

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题