Kafka消息丢失,因为后来的消息

yiytaume  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(640)

所以我和我的Kafka消费者有了一些恼人的补偿委托案。我的项目使用“Kafka节点”。我创建了一个主题。在2台服务器上的使用者组中创建了2个使用者。自动提交设置为false。对于我的消费者得到的每一个mesage,他们都会启动一个异步进程,这可能需要1~20秒,当进程完成时,消费者会提交偏移量。。我的问题是:有一个senarios,消费者1得到一条消息,需要20秒来处理。在处理过程中,他收到另一条消息,需要1秒才能处理。他完成第二个消息处理,提交偏移量,然后立即崩溃。导致上一个消息处理失败。如果我重新运行consumer,他不会再阅读第一条消息,因为第二条消息已经提交了offst,它大于第一条消息。我怎样才能避免这种情况?

Kafkaconsumer.on('message', async(message)=>{
await SOMETHING_ASYNC_1~20SEC;
Kafkaconsumer.commit(()=>{});
});
yyhrrdl8

yyhrrdl81#

实际上,您希望通过利用 async.queue .
创建 async.queue 具有消息处理器和并发性(消息处理器本身用 setImmediate 因此不会冻结事件循环)
设置 queue.drain 恢复消费者
消费者的 message 事件暂停使用者并将消息推送到队列。
Kafka节点自述在这里详细介绍了这一点。
与您的问题类似的示例实现可以在这里找到。

相关问题