前提:使用的是spring-cloud-starter-stream-rocketmq 2.2.3版本
- 自定义发送消息的通道
public interface ProducerChannel {
String OBJECT_TX = "object_tx";
@Output(ProducerChannel.OBJECT_TX)
MessageChannel objectTx();
}
使用了注解@EnableBinding({ProducerChannel.class})
- application.yml配置binding信息
spring:
cloud:
stream:
bindings:
#事务消息
object_tx:
destination: object_tx_msg
producer:
transactional: true
group: myTxProducerGroup
- 实现RocketMQLocalTransactionListener
@RocketMQTransactionListener(txProducerGroup = "myTxProducerGroup", corePoolSize = 5,maximumPoolSize = 10)
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
System.out.println(JSON.toJSONString(msg) + " " + JSON.toJSONString(arg));
return RocketMQLocalTransactionState.UNKNOWN;
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
System.out.println(msg.getPayload());
return RocketMQLocalTransactionState.COMMIT;
}
}
但是,object_tx通道设置的producer属性没有被加载到ExtendedProducerProperties里面,导致事务消息没有成功。
1条答案
按热度按时间2exbekwf1#
please use the new version #2029