rabbitmq 如何让鼠兔首先监听优先级队列?

sxpgvts3  于 7个月前  发布在  RabbitMQ
关注(0)|答案(1)|浏览(77)

我有两个通过rabbitmq通信的微服务,我需要实现优先级消息。
第一个微服务充当发布者,使用symfony + messenger(amqp传输)编写。
第二个微服务充当消费者,用python + pika编写。
messenger文档(https://symfony.com/doc/current/messenger.html#prioritized-transports)建议为不同的消息优先级使用单独的队列,此组件无法使用rabbitmq的内置功能来区分消息的优先级。实际上发布者没有问题,我配置了它,以便必要的消息进入优先级队列。
消费者出现了问题,我不能让pika先读取优先级队列,然后读取常规队列。
下面是我的messenger组件配置的一个例子:

framework:
    messenger:
        transports:
            priority:
                dsn: "%env(MESSENGER_TRANSPORT_DSN)%"
                options:
                    exchange:
                        name: priority
                    queues:
                        priority: ~
            normal:
                dsn: "%env(MESSENGER_TRANSPORT_DSN)%"
                options:
                    exchange:
                        name: normal
                    queues:
                        normal: ~
        routing:
            'App\Message\PriorityRequest': priority
            'App\Message\NormalRequest': normal

这就是我如何填充队列:

for ($i = 0; $i < 10; $i++) {
    $bus->dispatch(new PriorityRequest($i, 'priority'));
    $bus->dispatch(new NormalRequest($i, 'normal'));
}

下面是一个python + pika中的消费者实现的例子:

import pika
import os

def do_work(self, connection, channel, delivery_tag, body):
   print(body)

parameters = pika.URLParameters(os.getenv('MESSENGER_TRANSPORT_DSN'))
connection = pika.BlockingConnection(parameters)

channel = connection.channel()
channel.basic_qos(prefetch_count=1)

channel.queue_declare(queue='priority', durable=True)
channel.queue_declare(queue='normal', durable=True)

channel.basic_consume(queue='priority', on_message_callback=do_work, auto_ack=True)
channel.basic_consume(queue='normal', on_message_callback=do_work, auto_ack=True)

channel.start_consuming()

如果我们运行消费者代码,我们会得到以下输出:

{'id': 0, 'data': 'priority'}
{'id': 0, 'data': 'normal'}
{'id': 1, 'data': 'priority'}
{'id': 1, 'data': 'normal'}
{'id': 2, 'data': 'priority'}
{'id': 2, 'data': 'normal'}
{'id': 3, 'data': 'priority'}
{'id': 3, 'data': 'normal'}
{'id': 4, 'data': 'priority'}
{'id': 4, 'data': 'normal'}
{'id': 5, 'data': 'priority'}
{'id': 5, 'data': 'normal'}
{'id': 6, 'data': 'priority'}
{'id': 6, 'data': 'normal'}
{'id': 7, 'data': 'priority'}
{'id': 7, 'data': 'normal'}
{'id': 8, 'data': 'priority'}
{'id': 8, 'data': 'normal'}
{'id': 9, 'data': 'priority'}
{'id': 9, 'data': 'normal'}

消息以FIFO顺序处理,我如何强制pika首先处理优先级队列中的消息,只有当优先级队列为空时才能进入正常队列?

mkshixfv

mkshixfv1#

Pika开箱即用不支持此功能。
一个选项是首先从优先级队列中删除basic_consume。当队列为空时,取消该消费者,然后从另一个队列中删除basic_consume。当工作完成后,重复并返回到优先级队列。

相关问题