在我的spring boot kafka应用程序中,我有以下使用者配置:
@Bean
public ConsumerFactory<String, Post> postConsumerFactory(KafkaProperties kafkaProperties) {
return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties(), new StringDeserializer(), new JsonDeserializer<>(Post.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Post> postKafkaListenerContainerFactory(KafkaProperties kafkaProperties) {
kafkaProperties.getProperties().put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, kafkaConsumerMaxPollIntervalMs);
kafkaProperties.getProperties().put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaConsumerMaxPollRecords);
ConcurrentKafkaListenerContainerFactory<String, Post> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
factory.setConsumerFactory(postConsumerFactory(kafkaProperties));
return factory;
}
消费者:
@KafkaListener(topics = "${kafka.topic.post.send}", containerFactory = "postKafkaListenerContainerFactory")
public void sendPost(ConsumerRecord<String, Post> consumerRecord, Acknowledgment ack) {
// do some logic
ack.acknowledge();
}
如果我理解正确的话,现在我有一个客户的示例。我想增加post消费者的数量,假设有5个消费者将从中消费不同(不相同)的消息 ${kafka.topic.post.send}
以加快消息消耗。
是不是跟加法一样简单 factory.setConcurrency(5);
给我的 postKafkaListenerContainerFactory()
,例如:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Post> postKafkaListenerContainerFactory(KafkaProperties kafkaProperties) {
kafkaProperties.getProperties().put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, kafkaConsumerMaxPollIntervalMs);
kafkaProperties.getProperties().put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaConsumerMaxPollRecords);
ConcurrentKafkaListenerContainerFactory<String, Post> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
factory.setConsumerFactory(postConsumerFactory(kafkaProperties));
factory.setConcurrency(5);
return factory;
}
或者我需要做一些额外的工作来实现它?
1条答案
按热度按时间siotufzp1#
Apache·Kafka不是这样工作的。一个想法是在同一个线程的同一个分区中总是有进程记录。那个
factory.setConcurrency(5);
是关于一个主题中有多少个分区。所以,如果你只有一个,这个属性不会带来任何价值。如果主题中有10个分区,那么springkafka将生成5个线程,每个线程将处理2个分区。我想说的是,参考手册中非常清楚:
假设提供了6个主题分区,并发性为3;每个容器将得到2个分区。对于5个主题分区,2个容器将获得2个分区,第3个容器将获得1个分区。如果并发性大于topicpartitions的数量,那么并发性将被向下调整,这样每个容器将获得一个分区。
因此,如果您想拥有您所描述的这种并发性,您确实需要在您的主题中创建5个分区。只有在这之后,您才能并行处理同一主题中的记录。