我们正在努力为我们的生产商实现交易。我们的用例是从mq接收消息并发布到kafka。当出现故障时,我们需要回滚发布到kafka的消息,而不向mq发送确认。
当我们使用事务时,我们会看到kafka主题中的消息是重复的。
@Bean("producerConfig")
public Properties producerConfig() {
LOGGER.info("Creating Dev Producer Configs");
Properties configs = new Properties();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, localhost:9092);
configs.put(ProducerConfig.ACKS_CONFIG, "all");
configs.put(ProducerConfig.RETRIES_CONFIG, 1);
configs.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
return configs;
}
@Bean
public ProducerFactory<String, String> producerFactory() {
DefaultKafkaProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(new HashMap<String, Object>((Map) producerConfig));
producerFactory.setTransactionIdPrefix("spring-kafka-transaction");
return producerFactory;
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory());
kafkaTemplate.setDefaultTopic(topic);
return kafkaTemplate;
}
@Bean
KafkaTransactionManager<String,String> kafkaTransactionManager(){
KafkaTransactionManager<String, String> transactionManager = new KafkaTransactionManager<>(producerFactory());
return transactionManager;
}
侦听器方法
@Component
public class WMQListener implements MessageListener {
KafkaTemplate<String, String> kafkaTemplate;
@Override
@Transactional
public void onMessage(Message message) {
String onHandXmlStr = null;
try {
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
onHandXmlStr = textMessage.getText();
}
LOGGER.debug("Message Received from WMQ :: " + onHandXmlStr);
Msg msg = JaxbUtil.convertStringToMsg(onHandXmlStr);
List<String> onHandList = DCMUtil.convertMsgToList(msg);
ListenableFuture send = kafkaTemplate.sendDefault(onHandList.get(0));
send.addCallback(new ListenableFutureCallback() {
@Override
public void onFailure(Throwable ex) {
ex.printStackTrace();
}
@Override
public void onSuccess(Object result) {
System.out.println(result);
}
});
message.acknowledge();
}
1条答案
按热度按时间vsmadaxz1#
然而,我想知道为什么偏移量增加了2
因为kafka主题是一个线性日志(每个分区),所以回滚消息仍然占用日志中的一个槽(猜测)。
想想这个。。。
p1.发送(tx)(偏移量23)
p2.发送(tx)(偏移量24)
p1.回滚
p2.提交
p1.重发(tx)(偏移量25)。
p1.提交。
我的猜测是p1在偏移量23处的记录被简单地标记为回滚,而不是发送给消费者(除非在使用read\u uncommitted隔离写入时处于活动状态)。
编辑
我认为有/没有交易的抵消没有区别
和
但是,是的,在失败的情况下,会使用一个额外的插槽。。。
和
在下一次跑步中。。。
如果我删除了异常,在下一次运行时
因此,是的,事务本身似乎占用了日志中的一个槽。
所以这并不是一个重复的消息-你可能可以在他们的设计文档中看到。