由于Kafka主题与正在阅读的Kafka主题不同而导致spark流失败

jdg4fx2g  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(360)

对于以下写主题/读主题 air2008rand 串联:

import org.apache.spark.sql.streaming.Trigger
(spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("startingOffsets", "earliest")
.option("subscribe", "air2008rand")
.load()
.groupBy('value.cast("string").as('key))
.agg(count("*").cast("string") as 'value)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("startingOffsets", "earliest")
.option("includeTimestamp", true)
.option("topic","t1")
.trigger(Trigger.ProcessingTime("2 seconds"))
.outputMode("update")
.option("checkpointLocation","/tmp/cp")
.start)

由于主题不同而生成错误 air2008m1-0 :

scala> 19/07/14 13:27:22 ERROR MicroBatchExecution: Query [id = 711d44b2-3224-4493-8677-e5c8cc4f3db4, runId = 68a3519a-e9cf-4a82-9d96-99be833227c0] 
terminated with error
java.lang.IllegalStateException: Set(air2008m1-0) are gone. 
Some data may have been missed.
Some data may have been lost because they are not available in Kafka any more; either the
 data was aged out by Kafka or the topic may have been deleted before all the data in the
 topic was processed. If you don't want your streaming query to fail on such cases, set the
 source option "failOnDataLoss" to "false".
at org.apache.spark.sql.kafka010.KafkaMicroBatchReader.org$apache$spark$sql$kafka010$KafkaMicroBatchReader$$reportDataLoss(KafkaMicroBatchReader.scala:261)
at org.apache.spark.sql.kafka010.KafkaMicroBatchReader.planInputPartitions(KafkaMicroBatchReader.scala:124)
at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.partitions$lzycompute(DataSourceV2ScanExec.scala:76)
at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.partitions(DataSourceV2ScanExec.scala:75)
at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.outputPartitioning(DataSourceV2ScanExec.scala:65)

通过停止读/写代码(在sparkshell中),可以重复此行为 repl )然后重新运行它。
为什么不同的Kafka主题之间会有“相声”?

w46czmvw

w46czmvw1#

问题是由于检查点目录包含来自早期spark流操作的数据。解决方法是更改检查点目录。
在这个问题[illegalstateexception]中,这个解决方案是作为一个注解(来自@jaceklaskowski本人)找到的:spark structured streaming是带有错误的终止流查询

相关问题