谢谢你花时间回答这个问题。我正在使用kafka和python消费者。当消费者启动并运行时,一切都很好,消息被推送到kafka,然后由消费者阅读。
但是,如果消费者因为任何原因而下降,当它恢复时,它只会在消费者恢复后读取发布到kafka的新消息。关机和开机之间的消息丢失,也就是说,耗电元件在恢复后没有读取这些消息。
consumer = KafkaConsumer(..)
是我用来创建消费者的。
谢谢你花时间回答这个问题。我正在使用kafka和python消费者。当消费者启动并运行时,一切都很好,消息被推送到kafka,然后由消费者阅读。
但是,如果消费者因为任何原因而下降,当它恢复时,它只会在消费者恢复后读取发布到kafka的新消息。关机和开机之间的消息丢失,也就是说,耗电元件在恢复后没有读取这些消息。
consumer = KafkaConsumer(..)
是我用来创建消费者的。
1条答案
按热度按时间ncgqoxb01#
你用的是什么客户?可能有必要为耗电元件设置起始偏移量。看看seek()函数和auto commit设置。也许我的代码有帮助,但也许我们使用不同的消费类(mine:http://kafka python.readthedocs.org/en/latest/apidoc/kafka.consumer.html):