分区多于线程

bvpmtnay  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(165)

我面临一些kafka java消费api的问题。
我在一个主题中有5个分区,所有的消息都在分区5中。现在,尝试创建一个具有4个线程的高级使用者。看起来,因为消息在分区5中,所以如果线程数小于5,我就不能使用消息。如果线程数为5,我就可以使用消息。
但文件上说,
如果分区数超过线程数,则某些线程将从多个分区接收数据
这样不行。是否缺少任何配置?
代码:-

public KafkaConsumerGroup(final MessagingApplicationContext messaingCtx, final MessageProcessor msgProcessor) {

            consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());
            this.topic = messaingCtx.getKafkaConfig().getString(TOPIC_STR);

            this.readerService = messaingCtx.getRederService();
            this.msgProcessor = msgProcessor;
        }

    public void run(final int threadsCount) {
            final Map<String, Integer> topicsMap = new HashMap<String, Integer>();
            topicsMap.put(topic, threadsCount);
            final Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicsMap);
            final List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

            int threadNumber = 0;
            for (final KafkaStream<byte[], byte[]> stream : streams) {
                this.readerService.submit(new KafkaConsumer(stream, threadNumber, msgProcessor));
                threadNumber++;
            }
        }

private static ConsumerConfig createConsumerConfig() {

        final Properties props = new Properties();
        props.put("zookeeper.connect", "********");
        props.put("group.id", "testGrp");
        props.put("zookeeper.session.timeout.ms", 1000);
        props.put("zookeeper.sync.time.ms", 200);
        props.put("auto.commit.interval.ms", 1000);
        props.put("auto.offset.reset", "smallest");
        props.put("consumer.timeout.ms", -1);
        props.put("fetch.message.max.bytes", 4194304);
        props.put("rebalance.backoff.ms", 1000);
        props.put("rebalance.max.retries", 1);

        return new ConsumerConfig(props);
    }

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题