webflux和kafka中创建批量消费

sg24os4d  于 2021-07-15  发布在  Kafka
关注(0)|答案(1)|浏览(292)

我需要对Kafka进行民意调查并批量处理事件。在React堆Kafka,因为它是一个蒸汽api,我得到的事件流。有没有一种方法可以组合并获得固定的最大事件大小。
这就是我目前正在做的。

final Flux<Flux<ConsumerRecord<String, String>>> receive = KafkaReceiver.create(eventReceiverOptions)
    .receiveAutoAck();
receive
    .concatMap(r -> r)
    .doOnEach(listSignal -> log.info("got one message"))
    .map(consumerRecords -> consumerRecords.value())
    .collectList()
    .flatMap(strings -> {
      log.info("Read messages of size {}", strings.size());
      return processBulkMessage(strings)
          .doOnSuccess(aBoolean -> log.info("Processed records"))
          .thenReturn(strings);
    }).subscribe();

但是代码只是挂在collectlist之后,永远不会转到最后一个flatmap。
提前谢谢。

np8igboo

np8igboo1#

你只要用你的平原做一个“压平” .concatMap(r -> r) 因此,您完全消除了最初由它构建的批处理 receiveAutoAck() . 有一个列表流为您的 processBulkMessage() 要处理,请考虑将所有批处理逻辑移到 concatMap() :

.concatMap(batch -> batch
                    .doOnEach(listSignal -> log.info("got one message"))
                    .map(ConsumerRecord::value)
                    .collectList())
            .flatMap(strings -> {

相关问题