RabbitMQ和GCP Pub/Sub集成

4bbkushb  于 7个月前  发布在  RabbitMQ
关注(0)|答案(1)|浏览(68)

我正在集成RabbitMQ和GCP Pub/Sub。我正在使用Pika库与Python。连接由BlockingConnection建立。
我的消费者作为单独的线程运行。最大预期消息工作负载高达100 msg/s。
根据我执行的一些初步测试,似乎解决方案是工作,但我有一些怀疑回调函数是如何构造的。我调用publish方法发布到GCP中的Pub/Sub,然后在块中尝试,basic_ack处理消息。
如果有人在这方面有经验,我可以要求给予一些意见,我的解决方案,并可能是一些例子,它可以如何实施。

def consume_data_callback(self, basic_deliver, body):
        
        # ... some code

        future = self.publisher.publish(topic_path, self.payload.SerializeToString())
        try:
            message_id = future.result(timeout=1)
            self.channel.basic_ack(basic_deliver.delivery_tag)
        except Exception as e:
            future.cancel()
            _logger.error("Result after publishing Pub/Sub with: {}".format(e))

字符串
谢谢你的回答。

yb3bgrhw

yb3bgrhw1#

我个人还没有尝试过,但我确实搜索了可能对你有帮助的参考资料。这篇RabbitMQ-发布/订阅文章有一个回调代码示例,可以帮助你确认你所写的是否是一个合适的解决方案。
GitHub完整代码版本:

#!/usr/bin/env python
import os
import pika
import sys

def main():
    connection = pika.BlockingConnection(
        pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()

    channel.exchange_declare(exchange='logs', exchange_type='fanout')

    result = channel.queue_declare(queue='', exclusive=True)
    queue_name = result.method.queue

    channel.queue_bind(exchange='logs', queue=queue_name)

    def callback(ch, method, properties, body):
        print(f" [x] {body.decode()}")

    print(' [*] Waiting for logs. To exit press CTRL+C')
    channel.basic_consume(
        queue=queue_name, on_message_callback=callback, auto_ack=True)

    channel.start_consuming()

if __name__ == '__main__':
    try:
        main()
    except KeyboardInterrupt:
        print('Interrupted')
        try:
            sys.exit(0)
        except SystemExit:
            os._exit(0)

字符串

相关问题