sparkstreaming+kafka:在投票60000次后未能获得记录

3qpi33ja  于 2021-06-08  发布在  Kafka
关注(0)|答案(2)|浏览(424)

我在Kafka上做Spark流。流式处理作业开始正常,运行了几个小时,然后出现以下问题:
17/05/18 03:44:47错误执行器:任务8.0在阶段1864.0中出现异常(tid 27968)java.lang.assertionerror:Assert失败:未能获取spark-executor-c10f4ea9-a1c6-4a9f-b87f-8d6ff66e10a5 madlytics-rt\ u 1 3 1150964759在scala.predef$.assert(predef)上轮询60000后的记录。scala:170)在org.apache.spark.streaming.kafka010.cachedkafconsumer.get(cachedkafconsumer。scala:74)在org.apache.spark.streaming.kafka010.kafkardd$kafkarditerator.next(kafkardd。scala:227)在org.apache.spark.streaming.kafka010.kafkardd$kafkarditerator.next(kafkardd。scala:193)在scala.collection.iterator$$anon$11.next(iterator。scala:409)在scala.collection.iterator$$anon$11.next(迭代器。scala:409)在scala.collection.iterator$$anon$11.next(iterator。scala:409)在scala.collection.iterator$$anon$13.hasnext(iterator。scala:462)在scala.collection.iterator$$anon$11.hasnext(iterator。scala:408)在scala.collection.iterator$$anon$13.hasnext(iterator。scala:461)在scala.collection.iterator$$anon$13.hasnext(迭代器。scala:461)在scala.collection.iterator$$anon$13.hasnext(iterator。scala:461)在scala.collection.iterator$$anon$11.hasnext(iterator。scala:408)在scala.collection.iterator$$anon$13.hasnext(iterator。scala:461)在scala.collection.iterator$$anon$11.hasnext(iterator。scala:408)在scala.collection.iterator$$anon$13.hasnext(迭代器。scala:461)在scala.collection.iterator$$anon$11.hasnext(iterator。scala:408)在org.apache.spark.util.collection.externalsorter.insertall(externalsorter。scala:192)在org.apache.spark.shuffle.sort.sortshufflewriter.write(sortshufflewriter。scala:63)在org.apache.spark.scheduler.shufflemaptask.runtask(shufflemaptask。scala:79)在org.apache.spark.scheduler.shufflemaptask.runtask(shufflemaptask。scala:47)在org.apache.spark.scheduler.task.run(task。scala:86)在org.apache.spark.executor.executor$taskrunner.run(executor。scala:274)在java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor。java:1142)在java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor。java:617)在java.lang.thread.run(线程。java:745)
另外,我增加了 heartbeat.interval.ms , session.timeout.ms 以及 request.timeout.ms 正如这里所建议的:https://issues.apache.org/jira/browse/spark-19275
以下是一些相关配置:

batch.interval = 60s
spark.streaming.kafka.consumer.poll.ms = 60000
session.timeout.ms = 60000 (default: 30000)
heartbeat.interval.ms = 6000 (default: 3000)
request.timeout.ms = 90000 (default: 40000)

另外,kafka集群是一个5节点的集群,我正在读的主题有15个分区。Kafka的其他配置如下:

num.network.threads=8
num.io.threads=8

任何帮助都将不胜感激。谢谢。

nc1teljy

nc1teljy1#

我用一个简单的配置更改解决了这个问题,这个更改非常明显,但我花了一些时间才意识到如何不处理这种默认(mis)配置。
主要问题是spark配置 spark.streaming.kafka.consumer.poll.ms (kafkardd中默认为512ms)或 spark.network.timeout (默认120秒,如果 spark.streaming.kafka.consumer.poll.ms 是不是设置)总是小于Kafka消费 request.timeout.ms (kafka newconsumerapi中的默认值为305000ms)。。。因此,spark轮询总是在kafka使用者请求/轮询发生超时之前超时(当kafka主题中没有可用记录时)。
简单地增加 spark.streaming.kafka.consumer.poll.ms 比Kafka还大 request.timeout.ms 应该会成功的。也调整Kafka消费 max.poll.interval.ms 总是少于 request.timeout.ms .
q、 祝你好运。

6jygbczu

6jygbczu2#

根据我的经验,这种特殊的失败是Kafka集群过载的一个症状。通常的嫌疑犯都是饥肠辘辘的。
除此之外,表面上看Kafka一切都很好,但也许不是。
在添加分区后,是否需要花费大量时间重新平衡?还是因为您执行的所有负载测试而维护了一个庞大的偏移量主题?
曾经发生在我身上的是,星团在表面上很好,但这个超时出现在这里和那里。在一个全新的,甚至更小的集群上,这个问题消失了。

相关问题