如何使用Sping Boot 从一开始就每秒消耗一个Kafka主题的所有Kafka消息?

clj7thdc  于 7个月前  发布在  Apache
关注(0)|答案(1)|浏览(67)

我是Kafka的新手,我需要使用spring Boot 从头到尾连续地(每秒)打印来自Kafka主题的所有消息。
也需要一次得到所有的消息,而不是一个接一个。
我使用了.. auto.offset.reset=earliest和enable.auto.commit=false和random/new group.id,但对我来说什么都不起作用:(

@KafkaListener(topics = "retry-events", groupId = "my-consumer-group")
    public void consumeMessage(String message) {
        System.out.println("Received message: " + message);
    }

字符串
在这里,如果我给Kafka发送一条消息,那么它打印的是最新的消息。我需要每秒钟从开始一遍又一遍地打印每一条消息。

cgfeq70w

cgfeq70w1#

默认情况下,Kafka为每个消费者组/分区维护一个最后提交的偏移指针。
当消费者启动时,默认行为是从该偏移量开始。auto.offset.reset=earliest仅在消费者没有提交的偏移量时适用(通常是消费者第一次启动时)。
要始终从头开始,您需要在分配分区时执行查找操作。
当使用Spring时,这可以很容易地通过让侦听器扩展AbstractConsumerSeekAware并在分配分区时调用seekToBeginning()来完成。
请访问https://docs.spring.io/spring-kafka/docs/current/reference/html/#seek

相关问题