Spark读写阿夫罗失败与Kafka

lzfw57am  于 2021-07-15  发布在  Kafka
关注(0)|答案(0)|浏览(206)

我试图用spark从postgres到kafka,反之亦然。在出版商方面,我正在做

df.select(cols).as("data").select(to_avro(struct("data.*"))).as("value").write().format(kafka)
   .options("kafka.bootstap.server",serverdetails).options("topic",topicname).save()

在用户端,我正在执行stringschema=newstring(files.readallbytes(path.get(“avsc file path”))

spark.read().format("kafka").options("kafka.bootstap.server",serverdetails).options("subscribe",topic)
.load().select(from_avro(col("value"),schema)).as("data")).select("data.*")

在运行时,我得到下面提到的错误

java.io.EOFException: null
                at org.apache.avro.io.BinaryDecoder$ByteArrayByteSource.readRaw(BinaryDecoder.java:944) ~[avro-1.8.2.jar:1.8.2]
                at org.apache.avro.io.BinaryDecoder.doReadBytes(BinaryDecoder.java:349) ~[avro-1.8.2.jar:1.8.2]
                at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:263) ~[avro-1.8.2.jar:1.8.2]
                at org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:201) ~[avro-1.8.2.jar:1.8.2]
                at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:422) ~[avro-1.8.2.jar:1.8.2]
                at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:414) ~[avro-1.8.2.jar:1.8.2]
                at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:181) ~[avro-1.8.2.jar:1.8.2]
                at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) ~[avro-1.8.2.jar:1.8.2]
                at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232) ~[avro-1.8.2.jar:1.8.2]
                at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222) ~[avro-1.8.2.jar:1.8.2]
                at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175) ~[avro-1.8.2.jar:1.8.2]
                at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) ~[avro-1.8.2.jar:1.8.2]
                at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145) ~[avro-1.8.2.jar:1.8.2]
                at org.apache.spark.sql.avro.AvroDataToCatalyst.nullSafeEval(AvroDataToCatalyst.scala:50) ~[spark-avro_2.12-2.4.5.jar:2.4.5]
                at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) ~[na:na]
                at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) ~[spark-sql_2.12-2.4.5.jar:2.4.5]
                at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:636) ~[spark-sql_2.12-2.4.5.jar:2.4.5]
                at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:255) ~[spark-sql_2.12-2.4.5.jar:2.4.5]
                at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:858) ~[spark-core_2.12-2.4.5.jar:2.4.5]
                at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:858) ~[spark-core_2.12-2.4.5.jar:2.4.5]
                at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) ~[spark-core_2.12-2.4.5.jar:2.4.5]
                at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) ~[spark-core_2.12-2.4.5.jar:2.4.5]
                at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) ~[spark-core_2.12-2.4.5.jar:2.4.5]
                at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) ~[spark-core_2.12-2.4.5.jar:2.4.5]
                at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) ~[spark-core_2.12-2.4.5.jar:2.4.5]
                at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) ~[spark-core_2.12-2.4.5.jar:2.4.5]
                at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) ~[spark-core_2.12-2.4.5.jar:2.4.5]
                at org.apache.spark.scheduler.Task.run(Task.scala:123) ~[spark-core_2.12-2.4.5.jar:2.4.5]
                at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:411) ~[spark-core_2.12-2.4.5.jar:2.4.5]
                at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) ~[spark-core_2.12-2.4.5.jar:2.4.5]

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题