kafka使用者在关闭后丢失消息的状态

tf7tbtn2  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(317)

谢谢你花时间回答这个问题。我正在使用kafka和python消费者。当消费者启动并运行时,一切都很好,消息被推送到kafka,然后由消费者阅读。
但是,如果消费者因为任何原因而下降,当它恢复时,它只会在消费者恢复后读取发布到kafka的新消息。关机和开机之间的消息丢失,也就是说,耗电元件在恢复后没有读取这些消息。

consumer = KafkaConsumer(..)

是我用来创建消费者的。

ncgqoxb0

ncgqoxb01#

你用的是什么客户?可能有必要为耗电元件设置起始偏移量。看看seek()函数和auto commit设置。也许我的代码有帮助,但也许我们使用不同的消费类(mine:http://kafka python.readthedocs.org/en/latest/apidoc/kafka.consumer.html):

def connect(self):
        '''Initialize Kafka Client and Consumer.'''
        try:
            print "Try to init KafkaClient:", self.Brokers
            self.__kafka_client = KafkaClient( self.Brokers )

            print "Try to init Kafka Consumer."
            self.__consumer = SimpleConsumer(
                    self.__kafka_client,
                    self.GroupID,
                    self.Topic,
                    auto_commit = True,
                    partitions=self.Partitions,
                    auto_commit_every_n = 100,
                    auto_commit_every_t=5000,
                    fetch_size_bytes=4096,
                    buffer_size=4096,
                    max_buffer_size=32768,
                    iter_timeout=None,
                    auto_offset_reset='largest' )

            print "Set the starting offset."
            self.__consumer.seek(0, self.OffsetMode)

self.__consumer.seek(0, 0) =>start reading from the beginning of the queue.
self.__consumer.seek(0, 1) =>start reading from current offset.
self.__consumer.seek(0, 2) =>skip all the pending messages and start reading only new messages (**maybeyour case**).

相关问题