如何负载平衡多节点单Kafka消费应用程序?

v6ylcynt  于 2021-07-26  发布在  Java
关注(0)|答案(1)|浏览(297)

我有一个SpringBootKafka监听器,它只听一个主题和消费者列表。现在这个spring启动应用程序现在有3个节点。现在我需要确保列表消息的数量均衡地分配到所有3个节点。如何在我的应用程序中实现这一点?

zc0qhyus

zc0qhyus1#

在consumer中,group.id(consumerconfig.group\u id\u config)config提供负载平衡。发布到主题的每条记录都会传递到每个订阅使用者组中的一个使用者示例。
[组id][1]:
标识此使用者所属的使用者组的唯一字符串。如果使用者使用subscribe(主题)的组管理功能或基于kafka的偏移管理策略,则需要此属性。

import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
    import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
    import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
    import org.springframework.kafka.support.serializer.JsonDeserializer;

    import java.util.HashMap;
    import java.util.Map;

    @Configuration
    public class KafkaConsumerConfiguration {

        private Map<String, Object> consumerConfigs() {
            final Map<String, Object> props = new HashMap<>();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "your brokers");
            props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumeer-group-id");
            return props;
        }

        @Bean
        public ConsumerFactory<String, YourClass> kafkaListenerConsumerFactory() {
            final ErrorHandlingDeserializer<YourClass> errorHandlingDeserializer = new ErrorHandlingDeserializer<>(new JsonDeserializer<>(YourClass.class, false));
            return new DefaultKafkaConsumerFactory<>(this.consumerConfigs(), new StringDeserializer(), errorHandlingDeserializer);
        }

        @Bean
        public ConcurrentKafkaListenerContainerFactory<String, YourClass> kafkaListenerContainerFactory() {
            final ConcurrentKafkaListenerContainerFactory<String, YourClass> factory = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(this.kafkaListenerConsumerFactory());
            return factory;
        }

    }

  [1]: https://kafka.apache.org/documentation/#consumerconfigs_group.id

相关问题