kafka消费者获取以前使用过的消息

wnvonmuf  于 2021-06-04  发布在  Kafka
关注(0)|答案(2)|浏览(947)

我在xd中有一个consumer作业,它将在接收到由其他producer作业生成的消息后完成。我每天都会触发这些任务。我发现这个消费者有时会收到一条以前被消费过的信息。
日志如下:


#### OK

2019-06-28 02:06:13+0800 INFO inbound.job:Consumer_Job_In_XD-redis:queue-inbound-channel-adapter1 myConsumer.ConsumeTasklet - ==========consumed poll data ConsumerRecord(topic = my_consumer_topic, partition = 0, leaderEpoch = 0, offset = 4, CreateTime = 1561658772877, serialized key size = -1, serialized value size = 30, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message_from_producer) ==================
2019-06-28 02:06:13+0800 INFO inbound.job:Consumer_Job_In_XD-redis:queue-inbound-channel-adapter1 myConsumer.ConsumeTasklet - ==========message is  message_from_producer, task startTime is 1561658700108, timestamp is 1561658772877 ==================

#### NG

2019-06-29 17:07:14+0800 INFO inbound.job:Consumer_Job_In_XD-redis:queue-inbound-channel-adapter1 myConsumer.ConsumeTasklet - ==========consumed poll data ConsumerRecord(topic = my_consumer_topic, partition = 0, leaderEpoch = 0, offset = 0, CreateTime = 1561399136840, serialized key size = -1, serialized value size = 30, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message_from_producer) ==================
2019-06-29 17:07:14+0800 INFO inbound.job:Consumer_Job_In_XD-redis:queue-inbound-channel-adapter1 myConsumer.ConsumeTasklet - ==========message is  message_from_producer, task startTime is 1561799100282, timestamp is 1561399136840 ==================

#### OK

2019-06-29 22:16:58+0800 INFO inbound.job:Consumer_Job_In_XD-redis:queue-inbound-channel-adapter1 myConsumer.ConsumeTasklet - ==========consumed poll data ConsumerRecord(topic = my_consumer_topic, partition = 0, leaderEpoch = 2, offset = 5, CreateTime = 1561817817702, serialized key size = -1, serialized value size = 30, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message_from_producer) ==================
2019-06-29 22:16:58+0800 INFO inbound.job:Consumer_Job_In_XD-redis:queue-inbound-channel-adapter1 myConsumer.ConsumeTasklet - ==========message is  message_from_producer, task startTime is 1561817528447, timestamp is 1561817817702 ==================

#### NG

2019-07-02 02:05:09+0800 INFO inbound.job:Consumer_Job_In_XD-redis:queue-inbound-channel-adapter1 myConsumer.ConsumeTasklet - ==========consumed poll data ConsumerRecord(topic = my_consumer_topic, partition = 0, leaderEpoch = 0, offset = 0, CreateTime = 1561399136840, serialized key size = -1, serialized value size = 30, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message_from_producer) ==================
2019-07-02 02:05:09+0800 INFO inbound.job:Consumer_Job_In_XD-redis:queue-inbound-channel-adapter1 myConsumer.ConsumeTasklet - ==========message is  message_from_producer, task startTime is 1562004300372, timestamp is 1561399136840 ==================

看起来它多次收到offset=0消息。
kakfa版本(1.0.0)
使用者手动提交偏移量。(consumer.commitsync();)
仅设置以下属性:

bootstrap.servers  
auto.offset.reset=earliest  
group.id  
client.id
Properties config = new Properties();
    config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    config.put("auto.offset.reset", "earliest");
    config.put("group.id", group);
    config.put("client.id", config.getProperty("group.id") + "_" + System.currentTimeMillis());
    config.put("enable.auto.commit", false);
    try {
        consumer = new KafkaConsumer<>(config);
        consumer.subscribe(tList);
        while (true) {
            ConsumerRecords<?, ?> records = consumer.poll(10000);
            for (ConsumerRecord<?, ?> record : records) {
                //.........
                consumer.commitSync();
            }
            if (matched)
                break;
        }
    } finally {
        consumer.close();
    }
zwghvu4y

zwghvu4y1#

在kafka 1.1中,默认情况下,偏移量只保留24小时 offsets.retention.minutes 设置为1440。
因此,如果您停止您的消费者超过24小时,在重新启动时,有可能承诺的补偿将被删除,迫使消费者使用 auto.offset.reset 找一个新职位。
因为这对很多人来说太短了,从Kafka2.0开始, offsets.retention.minutes 现在设置为10080(7天)。
您应该更改代理配置以允许更长时间保留偏移量,或者更新到最新的kafka版本。

evrscar2

evrscar22#

尝试设置auto.offset.reset=latest,这样重新启动后,使用者将在最新提交的偏移量之后开始消费。
更多信息请点击此处https://kafka.apache.org/documentation/#consumerconfigs

相关问题