Spring Kafka获得分配分区

ar5n3qh5  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(759)

我知道我可以从哪个分区记录中找到,但我想知道有什么方法可以动态地获得在特定时刻为使用者分配了哪些分区?也许我需要实现一些侦听器来检测和跟踪分区分配信息?
我正在使用 spring-kafka 1.3.2ConcurrentKafkaListenerContainerFactory 以及 @KafkaListener .

1tu0hz3e

1tu0hz3e1#

我用不同的方法做了 KafkaListenerEndpointRegistry ```
for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry.getListenerContainers()) {
List containers = ((ConcurrentMessageListenerContainer) messageListenerContainer).getContainers();

        List<TopicPartition> topicPartitions = (List<TopicPartition>) containers.stream().flatMap(kafkaMessageListenerContainer ->
                        kafkaMessageListenerContainer.getAssignedPartitions().stream()).collect(Collectors.toList());

        partitions.addAll(topicPartitions.stream().map(TopicPartition::partition).collect(Collectors.toList()));

}

rmbxnbpk

rmbxnbpk2#

是的,你可以做:

@Bean
public ConsumerAwareRebalanceListener rebalanceListener() {
    return new ConsumerAwareRebalanceListener() { 
        @Override
        public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
           // here partitions
        }
    };
}

然后将其添加到concurrentkafkalistenercontainerfactory

@Bean
public ConcurrentKafkaListenerContainerFactory<Object, Object> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();

    ContainerProperties props = factory.getContainerProperties();
    props.setConsumerRebalanceListener(rebalanceListener());

    return factory;
}

相关问题