kafka-spring批处理使用者-提交单偏移量

f5emj3cl  于 2021-07-06  发布在  Java
关注(0)|答案(1)|浏览(335)

我在springboot中遇到了一个kafka批处理侦听器问题。

@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, this.maxPollRecords);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, this.autoOffsetReset);

props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, this.maxFetchBytesMaxPartition);
props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, this.maxFetchBytesMax);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
props.put(ErrorHandlingDeserializer2.VALUE_DESERIALIZER_CLASS, ByteArrayDeserializer.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, receiveBuffer);
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, heartbeatInterval);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, pollInterval);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, minFetch);
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, maxWaitFetch);
return props;
}

@Bean
public DefaultKafkaConsumerFactory<String, byte[]> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}

@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, byte[]> factory = new ConcurrentKafkaListenerContainerFactory<>();

    try {
        factory.setConsumerFactory(consumerFactory());
        factory.getContainerProperties().setAckOnError(false);
        factory.setBatchListener(true);
        factory.getContainerProperties().setSyncCommits(false);
        factory.setBatchErrorHandler(new SeekToCurrentBatchErrorHandler());
        factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
    } catch(Exception e) {
        logger.error("Error KafkaListenerContainerFactory: {}", e.getMessage());
    }

    return factory;
}

所以,这是@kafkalistener

@KafkaListener(autoStartup = "${kafka-startup}", groupId = "${kafka-group}", topics = "${queue}",
        containerFactory = "kafkaListenerContainerFactory", concurrency = "${concurrency}")
public void listen(@Payload List<byte[]> messages,
                   @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) List<String> keys,
                   @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
                   @Header(KafkaHeaders.RECEIVED_TIMESTAMP) List<Long> timestamps,
                   @Header(KafkaHeaders.OFFSET) List<Long> offsets,
                   Acknowledgment ack) throws Exception {

    int indexQueue = new Random().nextInt(queues.size());

    for (int i = 0; i < messages.size(); i++) {
        //Do somethings
        ack.acknowledge();
    }
}

这个解决方案,对于我的问题不起作用,因为 ack.acknowledge() 批量提交。对于我的解决方案,我需要提交单个消息的偏移量。
我试着用 KafkaConsumer<String, byte[]> consumerconsumer.commitAsync() ,但情况是一样的。为了测试它,脚本读取一个批(由3mex组成):在第30个消息包时,脚本启动一个异常。
例如:消息1->偏移量10;消息2->偏移量11,消息3->偏移量12
脚本正在读取:
消息1(偏移量10)->确定
消息2(偏移量11)->确定
消息3(偏移量12)->异常
在下一个循环中,脚本重新读取了偏移量为10的消息1,但我希望看到偏移量为12的消息3。
你有什么想法吗?你能帮帮我吗?
谢谢您

btxsgosb

btxsgosb1#

这个 Acknowledgment 对于批处理侦听器,只应调用一次。
您现在可以(从2.3开始)呼叫 acknowledgment.nack(thisOneFailed, sleep); 看到了吗https://docs.spring.io/spring-kafka/docs/current/reference/html/#committing-偏移量
从版本2.3开始 Acknowledgment 接口有两个附加方法 nack(long sleep) 以及 nack(int index, long sleep) . 第一个用于记录侦听器,第二个用于批处理侦听器。为侦听器类型调用错误的方法将引发illegalstateexception。

相关问题