RabbitMQ:如何处理连接丢失后不需要的重复un-ack消息?

ccgok5k5  于 7个月前  发布在  RabbitMQ
关注(0)|答案(2)|浏览(95)

在我的应用程序(多个示例)中,我们偶尔会看到由于网络问题(我的应用程序和rabbitmq都是活动的),我的应用程序和rabbitmq之间的连接丢失的情况,然后在连接恢复(重新建立)后,我们将收到未确认的消息。
这给我们带来了一个问题,因为我的应用程序并没有死,它仍然在处理它之前收到的相同消息,但现在消息被重新传递,它导致应用程序再次处理消息(这对我们来说可能是致命的)。
由于应用有多个示例,一个示例不容易检查另一个示例是否在同时处理同一条消息。我们不能简单地过滤掉重新发送的消息,因为我们需要此功能来处理示例/应用崩溃/重新部署。
似乎没有一个API来告诉rabbitmq何时不重新发送未确认的消息。
那么,处理这种情况的推荐做法是什么?
谢谢你,

pgvzfuti

pgvzfuti1#

这种情况的一般解决方案是让消费者以幂等的方式处理消息。(如果消息体中没有唯一标识符)我向消息体添加一个属性idempotencyId,它是一个guid,在消费者端,每个消息的这个id都根据数据库中的存储值进行验证,拒绝任何重复。
这种方法也适用于可能从另一个集群铲来的消息,或者如果在同一个集群中,多个消费者示例正在侦听,那么这种方法也可以保证一次性处理。
建议阅读RabbitMQ可靠性指南here

dfuffjeb

dfuffjeb2#

是的,exactly-once 交付不是RabbitMQ擅长的。事实上,我想说你可能不应该使用它来解决这类问题。老实说,真正解决这个问题的唯一方法是使用分布式事务或锁定。
无论如何,你可以在消费者收到消息后,在它开始处理消息之前,通过确认消息来解决这个问题。这至少可以避免RabbitMQ相关的重复问题。这是 * 最多一次 * 交付。
当然,这意味着如果消费者崩溃,消息将永远丢失。因此,您需要在确认消息之前将其持久化,以便稍后可以恢复它,并且消费者应该在完成后删除它。
考虑到崩溃是罕见的,你可以有一个专门的进程来处理那些持久化的消息,或者手动处理它们。
请注意,您正在将复制问题推到您面前,因为消费者可能无法在完成对持久化消息的处理后删除它,但至少您可以选择以任何方式实现它。
在这种情况下,存储可以是任何东西,从文件,RDBMS或类似ZooKeeper或Redis的东西,以锁定/解锁飞行中的消息。

相关问题