为什么我的kafka消费者具有相同的组id,却没有得到平衡?

ygya80vv  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(226)

我正在编写一个概念验证应用程序来使用来自apachekafka 0.9.0.0的消息,看看是否可以使用它来代替常见的jms消息代理,因为kafka提供了很多好处。这是我的基本代码,使用新的消费者api:

public class Main implements Runnable {

    public static final long DEFAULT_POLL_TIME = 300;
    public static final String DEFAULT_GROUP_ID = "ltmjTest";

    volatile boolean keepRunning = true;
    private KafkaConsumer<String, Object> consumer;
    private String servers;
    private String groupId = DEFAULT_GROUP_ID;
    private long pollTime = DEFAULT_POLL_TIME;
    private String[] topics;

    public Main() {
    }

    //getters and setters...

    public void createConsumer() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        configs.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

        configs.put("enable.auto.commit", "true");
        configs.put("auto.commit.interval.ms", "1000");
        configs.put("session.timeout.ms", "30000");

        configs.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        configs.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        consumer = new KafkaConsumer<>(configs);
        consumer.subscribe(asList(topics));
    }

    public static void main(String[] args) {
        Main main = new Main();
        if (args != null && args.length > 0) {
            for (String arg : args) {
                String[] realArg = arg.trim().split("=", 2);
                String argKey = realArg[0].toLowerCase();
                String argValue = realArg[1];
                switch (argKey) {
                case "polltime":
                    main.setPollTime(Long.parseLong(argValue));
                    break;
                case "groupid":
                    main.setGroupId(argValue);
                    break;
                case "servers":
                    main.setServers(argValue);
                    break;
                case "topics":
                    main.setTopics(argValue.split(","));
                    break;
            }
        }
        main.createConsumer();
        new Thread(main).start();
        try (Scanner scanner = new Scanner(System.in)) {
            while(true) {
                String line = scanner.nextLine();
                if (line.equals("stop")) {
                    main.setKeepRunning(false);
                    break;
                }
            }
        }
    }
}

我用默认设置启动了一个kafka服务器,用shell工具启动了一个kafka生产者 kafka-console-producer.sh 给我的主题写信息。然后我用这段代码连接两个使用者,发送适当的服务器进行连接,并发送主题进行订阅,其他所有内容都使用默认值,这意味着两个使用者都有相同的组id。我注意到只有一个使用者使用所有数据。我从官方教程中了解到,默认行为应该是服务器必须平衡消费者:
如果所有使用者示例都有相同的使用者组,那么这就像传统的队列平衡使用者的负载一样。
我怎样才能让消费者表现得像默认的那样呢?或者我错过了什么?

ef1yzkbh

ef1yzkbh1#

有一个trait kafka.consumer.partitionassignor说明了如何为每个使用者分配分区。它有两种实现:roundrobinassignor和rangeassignor。默认值是rangeassignor。
可以通过设置param“partition.assignment.strategy”来更改。
循环文件:
循环赋值器列出所有可用分区和所有可用使用者。然后继续执行从分区到使用者的循环分配。如果所有使用者示例的订阅都相同,则分区将均匀分布(i、 例如,假设有两个使用者c0和c1,两个主题t0和t1,每个主题有3个分区,从而得到分区t0p0、t0p1、t0p2、t1p0、t1p1和t1p2。赋值为:c0:[t0p0,t0p2,t1p1]c1:[t0p1,t1p0,t1p2]
范围赋值器文件
范围赋值器以每个主题为基础工作。对于每个主题,我们按数字顺序排列可用分区,按字典顺序排列使用者。然后,我们将分区数除以使用者总数,以确定分配给每个使用者的分区数。如果它不能平均分配,那么前几个消费者将有一个额外的分区。例如,假设有两个使用者c0和c1,两个主题t0和t1,每个主题有3个分区,从而得到分区t0p0、t0p1、t0p2、t1p0、t1p1和t1p2。赋值为:c0:[t0p0,t0p1,t1p0,t1p1]c1:[t0p2,t1p2]
因此,如果我们所有的主题只有一个分区,那么只有一个使用者可以工作

相关问题