我们有使用Spark结构流实施的流应用程序,试图从Kafka主题读取数据,并将其写入到HDFS位置。
有时应用程序会失败,并出现异常:
_spark_metadata/0 doesn't exist while compacting batch 9
java.lang.IllegalStateException: history/1523305060336/_spark_metadata/9.compact doesn't exist when compacting batch 19 (compactInterval: 10)
我们无法解决这个问题。
我找到的唯一解决方案是删除检查点位置文件,这将使作业在我们再次运行应用程序时从头开始读取主题/数据。然而,这对于生产应用来说并不是一个可行的解决方案。
有没有人可以在不删除检查点的情况下解决此错误,以便我可以从上次运行失败的位置继续运行?
应用程序的示例代码:
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", <server list>)
.option("subscribe", <topic>)
.load()
[...] // do some processing
dfProcessed.writeStream
.format("csv")
.option("format", "append")
.option("path",hdfsPath)
.option("checkpointlocation","")
.outputmode(append)
.start
暂无答案!
目前还没有任何答案,快来回答吧!