如何看到(新的)消费者连接到一个特定的kafka主题

x33g5p2x  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(338)

我有一个新消费者(python消费者)的列表。我可以使用以下命令检索组:

bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --list

我可以为每一个主题,他们是连接到

bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092  --describe --group TheFoundGroupId

如何获取与某个主题相关的所有组(最好是所有消费者,即使不在组中)?
除了将其作为shell命令运行之外,还有其他方法可以从python访问它吗?

wkftcu5l

wkftcu5l1#

谢谢你问这个问题。
所有使用者配置,如使用者组ID,哪个使用者订阅哪个主题存储在zookeeper中。
运行下面的命令以连接到zookeeper
/垃圾箱/Zookeeper外壳localhost:2181
然后跑
ls/消费者
你将得到所有的消费者群体,这是目前。如果你不提供消费群体也。Kafka将随机分配消费群体。对于控制台使用者,它将分配console-consumer-xx id
您可以从下面的python代码片段中获得所有使用者组
安装zookeeper python客户端

from kazoo.client import KazooClient

zk = KazooClient(hosts='127.0.0.1:2181')
zk.start()

# get all consumer groups

consumer_groups = zk.get_children("/consumers")
print("There are %s consumer group(s) with names %s" % (len(consumer_groups), consumer_groups))

# get all consumers in group

for consumer_group in consumer_groups:
    consumers = zk.get_children("/consumers/"+consumer_group)
    print("There are %s consumers in %s consumer group. consumer are : %s" % (len(consumers), consumer_group, consumers))

获取与某个主题相关的消费者或消费者组。
获取/consumers/consumergroup\u id/id/consumer\u id/
会给你像这样的输出

{"version":1,"subscription":{"test":1},"pattern":"white_list","timestamp":"1514218381246"}

在subscription object下,用户订阅的所有主题。根据您的用例实现逻辑
谢谢

iugsix8n

iugsix8n2#

这不是最好的解决方案,但由于似乎没有人有答案,下面是我最终如何解决的(为消费者分配一个组并替换\uuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu

import subprocess
    import os
    if "KAFKA_HOME" in os.environ:
        kafkapath = os.environ["KAFKA_HOME"]
    else:
        kafkapath = oms_cfg.kafka_home
        # error("Please set up $KAFKA_HOME environment variable")
        # exit(-1)

    instances = []
    # cmd = kafkapath + '/bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server {} --list'.format(oms_cfg.bootstrap_servers)
    # result = subprocess.run(cmd.split(' '), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    igr = ____YOURGROUP_____ # or run over all groups from the commented out command
    print("Checking topics of consumer group {}".format(igr))
    topic_cmd = kafkapath + '/bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server ' + oms_cfg.bootstrap_servers + ' --describe --group {gr}'
    result = subprocess.run(topic_cmd.format(gr=igr).split(' '), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    table = result.stdout.split(b'\n')
    # You could add a loop over topic here
    for iline in table[1:]:
        iline = iline.split()
        if not len(iline):
            continue
        topic = iline[0]
        # we could check here for the topic. multiple consumers in same group -> only one will connect to each topic
        # if topic != oms_cfg.topic_in:
        #     continue
        client = iline[-1]
        instances.append(tuple([client, topic]))
        #    print("Client {} Topic {} is fine".format(client, topic))
    if len(instances):
        error("Cannot start. There are currently {} instances running. Client/topic {}".format(len(instances),
                                                                                                  instances))
        exit(-1)

相关问题