kafka使用者在iterator.hasnext()上停留(实际上是暂停和恢复),即使主题中有大量消息可供使用

czq61nw1  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(199)

我正在开发一个分布式解决方案,其中两个使用者在同一使用者组下的两个不同服务器上运行,并使用一个3机kafka主题,其中包含2个分区和3个复制因子。在我的消费者类(这是一个 Callable ),关键部分如下所示:

@Override
public Object call() throws Exception {
    ConsumerIterator<byte[], byte[]> it = stream.iterator();
    try {            
        while (it.hasNext()){
            byte[] message = it.next().message();
            // other code here
        }
    } catch (Throwable e) {
        e.printStackTrace();
    }
    log.error("Shutting down Thread: " + streamNumber + ", kafka consumer offline!!!");
}

我的consumer类还派生了16个其他线程来处理已消耗的消息。当我在两个不同的服务器上启动两个消费者时,在开始的几分钟内,每个消费者似乎都会无缝地使用kafka主题中的消息(每个分区一个)。然而,经过一段时间后,每个消费者似乎都陷入了困境 while (it.hasNext()) 语句,即使每个分区中还有数千条消息要使用。下面的屏幕截图显示了Kafka消费抵消在这一点上的地位。

如您所见,消费者远远落后于主题中可用的消息数量。从我的日志来看,当这个消耗线程被暂停时,其他线程运行正常,正在执行它们的任务。从长远来看,有趣的是,我还注意到消耗线程在一段时间后会暂停和恢复。但是,每次暂停时,下次使用的消息数也会可笑地减少。例如,在我第一次启动这两个消费者之后,每一个似乎都无缝地消费了大约15000条消息,直到被流迭代器卡住,然后暂停了大约20-25分钟,又消费了大约5000条消息,然后又暂停了大约30分钟,又消费了大约100条消息。如果我停止消费进程并重新启动,整个循环似乎会重复。
以下是我正在使用的使用者配置:

group.id=ct_job_backfill     
zookeeper.session.timeout.ms=1000
zookeeper.sync.time.ms=200
auto.commit.enable=true
auto.offset.reset=smallest
rebalance.max.retries=20
rebalance.backoff.ms=2000
topic.name=contentTaskProd

消费者服务器是在linux上运行的每台32线程64 gb的机器。
知道是什么引起的吗?提前谢谢。如果您需要更多信息或有任何不清楚的地方,请告诉我。
更新:我已经尝试将分区的数量从2增加到32,并且在我的每个消费服务器中产生16个消费线程,每个消费线程从一个分区中消费。然而,这似乎并没有改变这种行为。我注意到同样的暂停和恢复周期。

bd1hkmkf

bd1hkmkf1#

我遇到了完全相同的问题。在浏览决议时,我遇到了Kafka在https://issues.apache.org/jira/browse/kafka-2978.
看起来它们已在0.9.0.1版本中解析。我将尝试用这个版本更新库。如果我能用新的jar解决这个问题,将会更新。同时你也可以试试~干杯

相关问题