使用Spring Cloud Stream + RabbitMQ(功能)发送消息后手动确认

u4dcyp6a  于 2022-11-08  发布在  RabbitMQ
关注(0)|答案(1)|浏览(678)

I found how to do manual ack with functional way.
但是,这并没有解释如何在消息之后手动发送确认消息。
如果在确认和发送消息之间有网络错误,我想消息会丢失。
我不想失去消息。
版本

  • Spring Boot :2.7.1
  • Spring Cloud:3.2.4

这是我当前的代码

@Controller
class MyFunction(private val myService: MyService) {
    private val objectMapper = ObjectMapper()

    init {
        objectMapper.registerKotlinModule()
    }

    @Bean
    fun function(): (Message<ByteArray>) -> Message<MyResponse> = {
        val channel = it.headers.get(AmqpHeaders.CHANNEL, Channel::class.java)!!
        val deliveryTag = it.headers.get(AmqpHeaders.DELIVERY_TAG, java.lang.Long::class.java)!!.toLong()

        val request = objectMapper.readValue(it.payload, MyRequest::class.java)

        try {
            val response = myService.process(request)

            channel.basicAck(deliveryTag, false)
            // I think it can potentially cause message loss
            MessageBuilder.withPayload(result)
                .build()
        } catch (ex: Exception) {
            channel.basicNack(deliveryTag, false, false)
            throw ex
        }
    }
}
jfewjypa

jfewjypa1#

使用Consumer来接收消息,使用StreamBridge来发送消息(而不是使用Function
但是为什么你需要手动确认呢?抛出异常会导致容器在自动模式下对消息进行nack(并使用正常退出进行确认)。

相关问题