这是我用spark结构化流媒体读取Kafka数据的代码,
//ss:SparkSession is defined before.
import ss.implicits._
val df = ss
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafka_server)
.option("subscribe", topic_input)
.option("startingOffsets", "latest")
.option("kafkaConsumer.pollTimeoutMs", "5000")
.option("failOnDataLoss", "false")
.load()
这是错误代码,
Caused by: java.util.concurrent.TimeoutException: Cannot fetch record xxxx for offset in 5000 milliseconds
如果我把5000扩大到10000,这个错误仍然会发生。我用谷歌搜索这个问题。关于这个问题似乎没有太多的相关信息。
以下是sbt文件中与此问题相关的部分。
libraryDependencies += "org.apache.spark" %% "spark-sql-kafka-0-10" % "2.3.0" exclude ("org.apache.kafka", "kafka-clients")
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.11.0.0"
1条答案
按热度按时间dzjeubhm1#
我也有这个错误。
我看了Kafka索的源代码,什么都没有。
我猜kafka连接器有问题,因此我在“spark-sql-kafka-0-10_.11”包中排除了kafka客户端,并添加了一个新的依赖项,如下所示:
现在起作用了。希望有帮助。
我创建了一个jira问题来报告这个问题:https://issues.apache.org/jira/browse/spark-23829
2018年12月17日更新:spark 2.4和kafka2.0解决了问题。