kafka commitasync使用提交顺序重试

lf5gs5x2  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(536)

我正在阅读Kafka的权威指南,在关于消费者的一章中,有一个关于“重试异步提交”的简介:
为异步重试获得正确的提交顺序的一个简单模式是使用单调递增的序列号。每次提交时增加序列号,并将提交时的序列号添加到commitasync回调。准备发送重试时,检查回调得到的提交序列号是否等于示例变量;如果是,则没有新的提交,可以安全地重试。如果示例序列号较高,则不要重试,因为已发送较新的提交。
作者的一个简短的例子对于像我这样的人来说是很好的。我特别不清楚我在上面加粗的部分。
有谁能阐明这意味着什么,或者更好地提供一个玩具例子来演示这一点?

kd3sttzy

kd3sttzy1#

我是这么想的,但如果谦虚一点,我可能错了

try {
        AtomicInteger atomicInteger = new AtomicInteger(0);
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(5);
            for (ConsumerRecord<String, String> record : records) {
                System.out.format("offset: %d\n", record.offset());
                System.out.format("partition: %d\n", record.partition());
                System.out.format("timestamp: %d\n", record.timestamp());
                System.out.format("timeStampType: %s\n", record.timestampType());
                System.out.format("topic: %s\n", record.topic());
                System.out.format("key: %s\n", record.key());
                System.out.format("value: %s\n", record.value());
            }

            consumer.commitAsync(new OffsetCommitCallback() {
                private int marker = atomicInteger.incrementAndGet();
                @Override
                public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets,
                                       Exception exception) {
                    if (exception != null) {
                        if (marker == atomicInteger.get()) consumer.commitAsync(this);
                    } else {
                        //Cant' try anymore
                    }
                }
            });
        }
    } catch (WakeupException e) {
        // ignore for shutdown
    } finally {
        consumer.commitSync(); //Block
        consumer.close();
        System.out.println("Closed consumer and we are done");
    }

相关问题