SpringKafka单个主题的多个使用者使用不同的消息

5ssjco0h  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(317)

在我的spring boot kafka应用程序中,我有以下使用者配置:

@Bean
public ConsumerFactory<String, Post> postConsumerFactory(KafkaProperties kafkaProperties) {
    return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties(), new StringDeserializer(), new JsonDeserializer<>(Post.class));
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Post> postKafkaListenerContainerFactory(KafkaProperties kafkaProperties) {

    kafkaProperties.getProperties().put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, kafkaConsumerMaxPollIntervalMs);
    kafkaProperties.getProperties().put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaConsumerMaxPollRecords);

    ConcurrentKafkaListenerContainerFactory<String, Post> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
    factory.setConsumerFactory(postConsumerFactory(kafkaProperties));

    return factory;
}

消费者:

@KafkaListener(topics = "${kafka.topic.post.send}", containerFactory = "postKafkaListenerContainerFactory")
public void sendPost(ConsumerRecord<String, Post> consumerRecord, Acknowledgment ack) {

    // do some logic

    ack.acknowledge();
}

如果我理解正确的话,现在我有一个客户的示例。我想增加post消费者的数量,假设有5个消费者将从中消费不同(不相同)的消息 ${kafka.topic.post.send} 以加快消息消耗。
是不是跟加法一样简单 factory.setConcurrency(5); 给我的 postKafkaListenerContainerFactory() ,例如:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Post> postKafkaListenerContainerFactory(KafkaProperties kafkaProperties) {

    kafkaProperties.getProperties().put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, kafkaConsumerMaxPollIntervalMs);
    kafkaProperties.getProperties().put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaConsumerMaxPollRecords);

    ConcurrentKafkaListenerContainerFactory<String, Post> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
    factory.setConsumerFactory(postConsumerFactory(kafkaProperties));
    factory.setConcurrency(5);

    return factory;
}

或者我需要做一些额外的工作来实现它?

siotufzp

siotufzp1#

Apache·Kafka不是这样工作的。一个想法是在同一个线程的同一个分区中总是有进程记录。那个 factory.setConcurrency(5); 是关于一个主题中有多少个分区。所以,如果你只有一个,这个属性不会带来任何价值。如果主题中有10个分区,那么springkafka将生成5个线程,每个线程将处理2个分区。
我想说的是,参考手册中非常清楚:
假设提供了6个主题分区,并发性为3;每个容器将得到2个分区。对于5个主题分区,2个容器将获得2个分区,第3个容器将获得1个分区。如果并发性大于topicpartitions的数量,那么并发性将被向下调整,这样每个容器将获得一个分区。
因此,如果您想拥有您所描述的这种并发性,您确实需要在您的主题中创建5个分区。只有在这之后,您才能并行处理同一主题中的记录。

相关问题