java—kafka 0.11中sendoffsetstotransaction的含义

piwo6bdm  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(409)

新的kafka版本(0.11)支持一次语义。
https://cwiki.apache.org/confluence/display/kafka/kip-98+-+exactly+once+delivery+and+transactional+messaging
我有一个生产者的设置与Kafka事务代码在java中像这样。

producer.initTransactions();
    try {
        producer.beginTransaction();
        for (ProducerRecord<String, String> record : payload) {
            producer.send(record);
        }

        Map<TopicPartition, OffsetAndMetadata> groupCommit = new HashMap<TopicPartition, OffsetAndMetadata>() {
            {
                put(new TopicPartition(TOPIC, 0), new OffsetAndMetadata(42L, null));
            }
        };
        producer.sendOffsetsToTransaction(groupCommit, "groupId");
        producer.commitTransaction();
    } catch (ProducerFencedException e) {
        producer.close();
    } catch (KafkaException e) {
        producer.abortTransaction();
    }

我不太清楚如何使用sendoffsetstotransaction和它的预期用例。另外,消费组是消费端的多线程读取功能。
javadoc说
“将消耗的偏移量列表发送给使用者组协调器,并将这些偏移量标记为当前事务的一部分。只有在事务提交成功的情况下,才会将这些偏移量视为已使用。当您需要将已使用的消息和已生成的消息一起批处理时,应该使用此方法,通常是在“使用-转换-生成”模式中
生产部门如何维护已消耗补偿的列表?这有什么意义?

u1ehiz5o

u1ehiz5o1#

这只与您正在使用的工作流相关,然后根据您使用的内容生成消息。此函数允许您仅在下游生成成功时提交消耗的偏移量。如果您使用数据,以某种方式对其进行处理,然后生成结果,这将在整个使用/生产过程中启用事务性保证。
如果没有事务,通常使用 Consumer#commitSync() 或者 Consumer#commitAsync() 提交消费者补偿。但是,如果您在使用生产者制作之前使用这些方法,那么在知道生产者是否成功发送之前,您就已经提交了偏移量。
因此,您可以使用 Producer#sendOffsetsToTransaction() 而不是由生产者提交偏移量。这会将偏移发送到处理事务的事务管理器。只有当整个事务消费和生产成功时,它才会提交偏移量。
(注意:发送要提交的偏移量时,应在上次读取的偏移量中添加1,以便将来的读取从尚未读取的偏移量恢复。无论您是向消费者还是生产者承诺,这都是正确的。请参阅:kafkaproducer sendfoffsetstotransaction need offset+1以成功提交当前偏移量)。

相关问题