如何使用java从apachekafka开始使用所有消息

x33g5p2x  于 2021-06-08  发布在  Kafka
关注(0)|答案(3)|浏览(286)

我正在尝试使用apachekafka中一个主题开头的所有消息。我可以消费在那一刻产生的信息。这是我获取消息的代码。

public void consume() {
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Arrays.asList(topic));
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(1000);
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("\"%s\"\n", record.value());
        }
    }
}
axzmvihb

axzmvihb1#

除了设置, auto.offset.reset=earliest ,尝试为属性设置新的/随机值 group.id 试试看。此外,如果你不想跟踪消费者的立场,但总是想从一开始,你也可以设置 enable.auto.commit=false 以避免污染主题。
希望有帮助。
谢谢。

sxissh06

sxissh062#

控制台使用者生成一个随机使用者组id以完成此任务,并设置 auto.offset.reset=earliest 确保关闭consumer对象以防止在zookeeper中添加大量临时使用者组id

cczfrluj

cczfrluj3#

订阅主题后,可以使用 seekToBeginning 方法以设置主题分区开头的偏移量。当然,它对每个分区都有效,因为具有不同分区的主题具有不同的开始偏移量(如果消息被删除)。

相关问题