python—从消息中心的主题检索消息

wpcxdonn  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(274)

我正在尝试使用合流的kafka python从bluemix上的消息中心的一个主题获取消息。我的代码在下面,但有些东西不工作。主题和消息中心已启动并正在运行,因此代码中可能有一些内容。

from confluent_kafka import Producer, KafkaError, Consumer

consumer_settings = {
'bootstrap.servers': 'broker-url-here',
'group.id': 'mygroup',
'default.topic.config': {'auto.offset.reset': 'smallest'},
'sasl.mechanisms': 'PLAIN',
'security.protocol': 'ssl',
'sasl.username': 'username-here',
'sasl.password': 'password-here',
}

c = Consumer(**consumer_settings)
c.subscribe(['topic-here'])

running = True

while running:
    msg = c.poll()
    if msg.error():
        print("Error while retrieving message")
        c.close()
        sys.exit(10)
    elif (msg is not None):
        for x in msg:
            print(x)
    else:
        sys.exit(10)

当我运行代码时,它似乎被困在 msg = c.poll() . 所以我猜要么是无法连接,要么是无法检索消息。凭证本身是正确的。

vyu0f0g1

vyu0f0g11#

消费逻辑看起来不错,但是消费的配置不正确。 security.protocol 需要设置为
sasl_ssl ssl.ca.location 需要指向包含可信证书的pem文件。每个操作系统的文件位置不同,但最常见的是:
bluemix/ubuntu版本: /etc/ssl/certs 红帽: /etc/pki/tls/cert.pem macos公司: /etc/ssl/certs.pem 我们还有一个使用此客户端的示例应用程序,可以轻松启动或部署到bluemix:https://github.com/ibm-messaging/message-hub-samples/tree/master/kafka-python-console-sample

相关问题