消费者陷入重新加入

emeijp43  于 2021-06-08  发布在  Kafka
关注(0)|答案(2)|浏览(289)

我已经阅读了其他线程,并且通过使用一个新的组id绕过了这个问题,但是我想知道是什么导致了这个问题。
我有一个有16个分区的主题,我设置了session.timeout.ms=30000,max.poll.interval.ms=30000000。
我运行我的程序,并按ctrl+c组合键,所以它不能正常关闭。我猜,16次之后,我陷入了重新加入的问题。session.timeout.ms是心跳超时,所以在30秒之后,它应该会让我的消费者感到高兴,而我的分区应该会“释放”出来,对吧?还是只听max.poll.interval.ms?
编辑:我仍然间歇性地得到这个错误,当它发生时,我必须重新启动我的所有消费者。即使当我的消费者运行良好,然后他们都开始陷入重新加入(没有添加/删除消费者)的困境时,这种情况也会发生。下面是一个错误日志,当我尝试连接到它后,与一个新的消费者,当它卡在该状态:
https://pastebin.com/axjeshkp

2017-06-29 17:28:16,215 DEBUG [AbstractCoordinator] - [scheduler-1] - Sending JoinGroup ((type: JoinGroupRequest, groupId=ingestion-matching-kafka-consumer-group-dev1, sessionTimeout=30000, rebalanceTimeout=43200000, memberId=, protocolType=consumer, groupProtocols=org.apache.kafka.common.requests.JoinGroupRequest$ProtocolMetadata@b45e5583)) to coordinator kafka04-prod01.messagehub.services.us-south.bluemix.net:9093 (id: 2147483644 rack: null)

2017-06-29 17:37:21,261 DEBUG [NetworkClient] - [scheduler-1] - Node 2147483644 disconnected.
2017-06-29 17:37:21,263 DEBUG [ConsumerNetworkClient] - [scheduler-1] - Cancelled JOIN_GROUP request {api_key=11,api_version=1,correlation_id=19,client_id=ingestion-matching-kafka-consumer-dev1} with correlation id 19 due to node 2147483644 being disconnected

这些是我认为相关的第一条和最后一条信息。以下是我设置的相关超时:

session.timeout.ms=30000
max.poll.interval.ms=43200000    
request.timeout.ms=43205000 # the docs said to keep this higher than max.poll.interval.ms
enable.auto.commit=false

我也应该设置heartbeat.interval.ms吗?这是不是消费者在某个后台线程中自动将心跳发送到代理的间隔时间(我已经阅读了文档,但由于某些原因我不能完全理解它)?

wd2eg0qa

wd2eg0qa1#

我知道这是一个很老的问题,但我有类似的问题,最后我明白了这种情况的原因,并想与大家分享。
当重新平衡开始时,kafka等待组中的所有使用者poll()并发送joingroup请求。重新平衡超时等于max.poll.interval.ms。因此,kafka会等待每个消费者的重新平衡超时或流程结束。
在本例中,您将max.poll.interval.ms设置为12小时。唯一合理的理由是你必须有一个漫长的过程。所以当重新平衡开始时,Kafka会等到你的过程完成或者12个小时过去。这就是为什么你的消费者似乎陷入困境。

k2fxgqgv

k2fxgqgv2#

如果您的客户端没有正确断开连接(crash或sigint),服务器将需要session.timeout.ms(在您的情况下是30秒)才能将其从组中踢出。在此期间,服务器仍会认为使用者是组的一部分,因此不会进行任何重新分配。一旦延迟结束,分配的分区将被重新分配给其他使用者(如果有的话)。
当然,如果您使用新的组id,则不会发生这种情况。虽然每次开发时都会尝试使用新组(因为您不必等待),但您会丢失前一个组所提交的任何偏移量,这可能并不代表您的应用程序在生产环境中运行时的状态。
关于max.poll.interval.ms,它是消费逻辑中对poll()的两次调用之间允许的最大延迟。我认为这个设置与这个问题无关。

相关问题