spark流非法状态异常:此使用者已关闭

thtygnil  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(340)

所以使用:-spark structured streaming(2.1.0)-kafka 0.10.2.0-scala 2.11
我使用kafka的默认api,因此基本上:

val df = spark.readStream
  .format("kafka")
  .option(...)

设置选项(通过ssl)和一切。然后我很明显地应用了一些动作等,并启动流等(它运行正常)。但是,它不时抛出一个异常:

17/05/30 11:05:23 WARN TaskSetManager: Lost task 23.0 in stage 77.0 (TID 3329, spark-worker-3, executor 0): java.lang.IllegalStateException: This consumer has already been closed.
at org.apache.kafka.clients.consumer.KafkaConsumer.ensureNotClosed(KafkaConsumer.java:1611)
at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1622)
at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1198)
at org.apache.spark.sql.kafka010.CachedKafkaConsumer.seek(CachedKafkaConsumer.scala:278)
at org.apache.spark.sql.kafka010.CachedKafkaConsumer.fetchData(CachedKafkaConsumer.scala:177)
at org.apache.spark.sql.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:89)
at org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:147)
at org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:136)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
at org.apache.spark.sql.execution.streaming.ForeachSink$$anonfun$addBatch$1.apply(ForeachSink.scala:52)
at org.apache.spark.sql.execution.streaming.ForeachSink$$anonfun$addBatch$1.apply(ForeachSink.scala:49)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:925)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:925)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

有什么提示为什么会失败?

sqserrrh

sqserrrh1#

https://issues.apache.org/jira/browse/spark-18682 在实现批处理kafka源代码时修复了它。你应该无法在spark 2.1.1中看到它。如果在spark 2.1.1中仍然看到此错误,请在上创建spark票证https://issues.apache.org/jira/browse/spark

相关问题