java.lang.classcastexception:org.apache.avro.generic.genericdata$record不能强制转换为packagename.myrecord

50few1ms  于 2021-05-29  发布在  Hadoop
关注(0)|答案(2)|浏览(287)

我正在尝试使用spark 1.5.1(scala 2.10.2)从hdfs(spark avro 1.7.7)读取一些.avro文件,以便对它们进行一些计算。
现在,假设我已经彻底搜索了web以找到解决方案(到目前为止,最好的链接是建议使用genericrecord的链接,而这个链接报告了相同的问题,而这个链接对我不起作用,因为它提供的代码与我使用的代码几乎相同),我在这里问,因为可能有人也有同样的想法。代码如下:

import org.apache.avro.mapred.{AvroInputFormat, AvroWrapper} 
import org.apache.hadoop.io.NullWritable 
import org.apache.spark.{SparkConf, SparkContext}

object SparkPOC {

  def main(args: Array[String]): Unit ={

    val conf = new SparkConf()
      .setAppName("SparkPOC")
      .set("spark.master", "local[4]")
    val sc = new SparkContext(conf)
    val path = args(0)
    val profiles = sc.hadoopFile(
      path,
      classOf[AvroInputFormat[MyRecord]],
      classOf[AvroWrapper[MyRecord]],
      classOf[NullWritable]
    )

    val timeStamps = profiles.map{ p => p._1.datum.getTimeStamp().toString}
    timeStamps.foreach(print)

}

我得到以下信息:

java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to packagename.MyRecord
    at packagename.SparkPOC$$anonfun$1.apply(SparkPOC.scala:24)
    at packagename.SparkPOC$$anonfun$1.apply(SparkPOC.scala:24)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:890)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:890)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1848)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1848)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:88)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:744)

有人有线索吗?我也在考虑使用spark avro的可能性,但是它们不支持同时读取多个文件(而.hadoopfile支持通配符)。否则,我似乎不得不使用genericord并使用.get方法,从而失去了编码模式(myrecord)的优势。
提前谢谢。

tktrz96b

tktrz96b1#

我通常把它作为genericord读入,并根据需要显式地转换

val conf = sc.hadoopConfiguration
sc.newAPIHadoopFile(path, classOf[AvroKeyInputFormat[GenericRecord]], classOf[AvroKey[GenericRecord]], classOf[NullWritable], conf).map(_._1.datum().asInstanceOf[MyRecord])
k7fdbhmy

k7fdbhmy2#

在我设置kryoserializer和spark.kryo.registator类之后,问题就解决了,如下所示:

val config = new SparkConf()
  .setAppName(appName)
  .set("spark.master", master)
  .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  .set("spark.kryo.registrator", "com.mypackage.AvroKryoRegistrator")

avrokryoregistator就是这样的。

相关问题