kafka consumer-consumer进程和线程与主题分区的关系是什么

jexiocij  于 2021-06-08  发布在  Kafka
关注(0)|答案(3)|浏览(380)

我最近一直在与Kafka合作,对消费者群体下的消费者有些困惑。混淆的中心是将使用者实现为进程还是线程。对于这个问题,假设我使用的是高级消费者。
让我们考虑一个我已经试验过的场景。在我的主题中有两个分区(为了简单起见,让我们假设复制因子仅为1)。我创造了一个消费者( ConsumerConnector )过程 consumer1 与组 group1 ,然后创建一个大小为2的主题计数Map,然后派生出2个使用者线程 consumer1_thread1 以及 consumer1_thread2 在这个过程中。看起来像 consumer1_thread1 正在使用分区 0 以及 consumer1_thread2 正在使用分区 1 . 这种行为总是确定性的吗?下面是代码片段。班级 TestConsumer 是我的消费线程类。

...
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put(topic, new Integer(2));
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
    List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

    executor = Executors.newFixedThreadPool(2);

    int threadNumber = 0;
    for (final KafkaStream stream : streams) {
        executor.submit(new TestConsumer(stream, threadNumber));
        threadNumber++;
    }
    ...

现在,让我们考虑另一个场景(我还没有做过实验,但我很好奇),在这个场景中,我启动了两个消费进程 consumer1 以及 consumer2 都是同一组的 group1 每一个都是一个单线程进程。现在我的问题是:
在这种情况下,两个独立的使用者进程(在同一组下)将如何与分区相关?它与上面的单进程多线程场景有何不同?
一般来说,使用者线程或进程如何Map到主题中的分区/与之相关?
kafka文档确实指出,一个使用者组下的每个使用者将使用一个分区。但是,这是指使用者线程(如我上面的代码示例)还是独立的使用者进程?
关于将消费者实现为进程vs线程,我在这里遗漏了什么微妙的东西吗?提前谢谢。

ldfqzlk8

ldfqzlk81#

一个使用者组可以有多个正在运行的使用者示例(多个进程具有相同的 group-id ). 而每个分区只被组中的一个使用者示例使用。
e、 g.如果你的主题包含2个分区,并且你启动了一个消费者组 group-A 对于2个使用者示例,则每个示例都将使用来自主题特定分区的消息。
如果使用不同的组id启动相同的2个使用者 group-A & group-B 然后来自主题的两个分区的消息将广播到每个分区。在这种情况下,运行在 group-A 将包含来自主题的两个分区的消息,对于 group-B 也。
在他们的文档中阅读更多关于这个的信息
编辑:根据你的评论,
我想知道在同一个进程下有两个使用者线程与在两个使用者进程下有两个使用者线程(两种情况下组是相同的)之间的有效区别是什么
消费者 group-id 在集群中是相同的/全局的。假设您启动了一个进程,其中有两个线程,然后派生出另一个进程(可能在另一台计算机中),该进程具有相同的groupid,并且还有两个线程,那么kafka将添加这两个新线程以使用来自主题的消息。因此,最终将有4个线程负责消费来自同一主题的内容。然后kafka会触发一个重新平衡,将分区重新分配给线程,所以对于线程正在使用的特定分区,可能会发生这种情况 T1 of process P1 可以分配给线程使用 T2 of process P2 . 下面几行摘自wiki页面
当一个新进程以相同的使用者组名称启动时,kafka会将该进程的线程添加到可用于使用该主题的线程集中,并触发“重新平衡”。在这个重新平衡过程中,kafka会将可用分区分配给可用线程,可能会将一个分区移动到另一个进程。如果您有新旧业务逻辑的混合体,则可能会有一些消息转到旧逻辑。

kyks70gy

kyks70gy2#

感谢@user2720864的详细回答,但我认为回答中提到的@user2720864重新分配案例是不正确的=>一个分区不能被两个使用者使用。
当有更多的使用者(与分区相比)时,每个分区将只独占地分配给一个使用者,而剩余的使用者将保持懒惰,直到一些工作的使用者死亡或从组中移除。
根据Kafka消费者文件:
在kafka中实现消耗的方法是将日志中的分区划分到消耗示例上,这样每个示例在任何时间点都是分区“公平共享”的独占消耗者。这个保持组成员身份的过程是由kafka协议动态处理的。如果新示例加入组,它们将从组的其他成员那里接管一些分区;如果一个示例死亡,它的分区将被分配给其余的示例。
以及“用户组和主题订阅”部分的api规范:
这是通过平衡使用者组中所有成员之间的分区来实现的,这样每个分区就只分配给组中的一个使用者。

5jdjgkvh

5jdjgkvh3#

选择具有相同id的多个消费者组示例与选择单个消费者组示例的主要设计决策是弹性。例如,如果你有一个有两个线程的单一消费者,那么如果这台机器坏了,你就失去了所有消费者。如果您有两个具有相同id的独立使用者组,每个使用者组位于不同的主机上,则它们可以在故障中生存。理想情况下,在上述情况下,每个使用者组应该有两个线程,因此,如果一个主机宕机,另一个使用者组将使用其休眠线程占用另一个分区。实际上,总是希望有比分区更多的线程来覆盖这个因素。
您可以在不同的主机上运行每个使用者组。对于给定名称/id的单个使用者组,它将只在单个主机上运行,因为它在单个运行时环境中管理其所有线程。
kafka有一个算法来确定哪些线程/使用者组读取不同的主题分区。Kafka试图以一种有弹性的方式均匀地分配这些。当使用者组失败时,它会让其他组中的其他线程读取给定的分区。
指使用者组中的单个线程。如果线程比分区多,那么其中一些线程将保持休眠状态,直到其他线程无法提供弹性。
这种偏好与弹性有关。因此,通过使用相同id设置多个使用者组,我可以在多个主机上运行,从而使我的应用程序能够容忍失败。

相关问题