rabbitmq 跨多个线程使用Pika通道

oipij1gg  于 8个月前  发布在  RabbitMQ
关注(0)|答案(2)|浏览(116)

虽然很明显不能在多个线程之间使用单个PIKA连接,但我们可以在多个线程之间使用连接的通道吗?我得到了这个错误这样做,答案似乎是否定的。仅供参考

INFO  2019-02-07 13:14:12,927 pika.connection _on_terminate  2095: Disconnected from RabbitMQ at 127.0.0.1:5672 (505): UNEXPECTED_FRAME - expected content header for class 60, got non content header frame instead
nle07wnf

nle07wnf1#

虽然很明显不能在多个线程之间使用单个PIKA连接,但我们可以在多个线程之间使用连接的通道吗?
我是Pika的维护者之一,不,你不能在线程之间使用连接或通道。这是有记录的。

plicqrtu

plicqrtu2#

我就像下面这样做

Example using PIKA consumer without blocking thread  - PIKA and GRPC Streaming

###########
    def grpc_test(self, request, context): 
    # A response streaming GRPC implementation - Client gets stream of messages

        message_queue = Queue.Queue()
        app = request
        def rmq_callback(data):
            print("Got a call back from RMQ Client")
            message_queue.put(data)

        # Register with RabbitMQ for Data
        # thread safe - create a connection here and a channel
        pikaconsumer = TestConsumer()
        # Client want to listen on this queue
        pikaconsumer.listen_on_queue("xxxx", rmq_callback) 
        # use the connection and channel in a new thread (and no other thread)
        t= threading.Thread(target=pikaconsumer.start_consuming)
        t.start()

        while True:
              date = message_queue.get(True)
              protobuf_obj = proto.Data()
              message.ParseFromString(obj)
              yield message

###########

class TestConsumer(object):

    def __init__(self):
        amqp_url ='amqp://guest:[email protected]:5672/'
        parameters = pika.URLParameters(amqp_url)
        connection = pika.BlockingConnection(parameters)
        self._channel = connection.channel()

    def listen_on_queue(self,queue_name,exchange,routing_keys,_callback):
        # In case queue is  not there - create a queue
        self._channel.queue_declare(queue=queue_name,auto_delete=True,)
        for routing_key in routing_keys:
            self._channel.queue_bind(queue_name,
                                 exchange, str(routing_key))
            LOGGER.info('Binding Exchange[%s] to Queue[%s] with RoutingKey[%s]',
                    exchange, queue_name, str(routing_key))

        def __on_message(channel, method_frame, header_frame, body, callback=()):
            print(method_frame.delivery_tag)
            callback(body)
            channel.basic_ack(delivery_tag=method_frame.delivery_tag)
        self._consumer_tag = self._channel.basic_consume(partial(__on_message,
                    callback=_callback), queue_name)

    def start_consuming(self):
        self._channel.start_consuming()

相关问题