Kafka听众,获取所有信息

xxb16uws  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(247)

大家好。我有一个Kafka项目使用SpringKafka什么听一个明确的主题。我需要一天听一次所有的信息,把它们收集起来,然后在那里找到特定的信息。我不明白如何用一个@kafkalistener方法来阅读所有的信息。我的班级是:

@Component
public class KafkaIntervalListener {

    public CountDownLatch intervalLatch = new CountDownLatch(1);
    private final SCDFRunnerService scdfRunnerService;

    public KafkaIntervalListener(SCDFRunnerService scdfRunnerService) {
        this.scdfRunnerService = scdfRunnerService;
    }

    @KafkaListener(topics = "${kafka.interval-topic}", containerFactory = "intervalEventKafkaListenerContainerFactory")
    public void intervalListener(IntervalEvent event) throws UnsupportedEncodingException, JSONException {
        System.out.println("Recieved interval message: " + event);
        IntervalType type = event.getType();
        Instant instant = event.getInterval();
        List<IntervalEvent> events = new ArrayList<>();
        events.add(event);
        events.size();

        this.intervalLatch.countDown();
    }

}

我的事件集合的大小始终为1;我试着使用不同的循环,但后来,我的收藏变成了同一条消息的530000次归档。
更新:我已经找到了一种方法来使用factory.setbatchlistener(true);但我需要找到launch it with@scheduled(cron=“${kafka.cron}”,zone=“europe/moscow”)。现在这种方法总是在听。现在我试着这样做:

@Scheduled(cron = "${kafka.cron}", zone = "Europe/Moscow")
public void run() throws Exception {
    kafkaIntervalListener.intervalLatch.await();
}

它不工作,在调试模式下我的断点从来没有工作在这个网站上。

fivyi3re

fivyi3re1#

侦听器容器在设计上是消息驱动的。
对于按需获取消息,最好使用kafka Consumer 直接访问api并使用 poll() 方法。

相关问题