I found how to do manual ack with functional way.
但是,这并没有解释如何在消息之后手动发送确认消息。
如果在确认和发送消息之间有网络错误,我想消息会丢失。
我不想失去消息。
版本
- Spring Boot :2.7.1
- Spring Cloud:3.2.4
这是我当前的代码
@Controller
class MyFunction(private val myService: MyService) {
private val objectMapper = ObjectMapper()
init {
objectMapper.registerKotlinModule()
}
@Bean
fun function(): (Message<ByteArray>) -> Message<MyResponse> = {
val channel = it.headers.get(AmqpHeaders.CHANNEL, Channel::class.java)!!
val deliveryTag = it.headers.get(AmqpHeaders.DELIVERY_TAG, java.lang.Long::class.java)!!.toLong()
val request = objectMapper.readValue(it.payload, MyRequest::class.java)
try {
val response = myService.process(request)
channel.basicAck(deliveryTag, false)
// I think it can potentially cause message loss
MessageBuilder.withPayload(result)
.build()
} catch (ex: Exception) {
channel.basicNack(deliveryTag, false, false)
throw ex
}
}
}
1条答案
按热度按时间jfewjypa1#
使用
Consumer
来接收消息,使用StreamBridge
来发送消息(而不是使用Function
。但是为什么你需要手动确认呢?抛出异常会导致容器在自动模式下对消息进行nack(并使用正常退出进行确认)。