我正在尝试使用apachekafka中一个主题开头的所有消息。我可以消费在那一刻产生的信息。这是我获取消息的代码。
public void consume() {
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("\"%s\"\n", record.value());
}
}
}
3条答案
按热度按时间axzmvihb1#
除了设置,
auto.offset.reset=earliest
,尝试为属性设置新的/随机值group.id
试试看。此外,如果你不想跟踪消费者的立场,但总是想从一开始,你也可以设置enable.auto.commit=false
以避免污染主题。希望有帮助。
谢谢。
sxissh062#
控制台使用者生成一个随机使用者组id以完成此任务,并设置
auto.offset.reset=earliest
确保关闭consumer对象以防止在zookeeper中添加大量临时使用者组idcczfrluj3#
订阅主题后,可以使用
seekToBeginning
方法以设置主题分区开头的偏移量。当然,它对每个分区都有效,因为具有不同分区的主题具有不同的开始偏移量(如果消息被删除)。