spring Sping Boot 中的多个Kafka生产者

xpcnnkqh  于 5个月前  发布在  Spring
关注(0)|答案(1)|浏览(72)

我有一个用例,它需要我有多个Kafka Producer(基于配置)。也就是说,如果我的配置有3个想要接收数据的租户,我想启动3个生产者(所有三个写入3个不同的集群)。
我尝试将我的Kafka配置设置为:

@Bean
  @Primary
  public Map<String, KafkaTemplate<String, String>> kafkaTemplates() {
    log.info("setting up kafka templates");
    final String ids = configuration.getIds();
    Map<String, KafkaTemplate<String, String>> kafkaTemplates = new HashMap<>();

    for (String id : ids.split(",")) {
      kafkaTemplates.put(id, kafkaTemplate(id));
    }
    return kafkaTemplates;
  }

  private KafkaTemplate<String, String> kafkaTemplate(String id) {
    log.info("setting up kafka template");
    try {
      Properties producerProperties = new Properties();
      try (InputStream kins = Files.newInputStream(new File("/opt/user-secrets/", id + ".properties").toPath())) {
        producerProperties.load(kins);
      } catch (IOException e) {
        throw new RuntimeException(e);
      }
      Map props = producerProperties;
      Map<String, Object> props1 = (Map<String, Object>) props;
      ProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(props1);
      return new KafkaTemplate<>(producerFactory);
    } catch (Exception e) {
      log.error("e {}", e.getMessage(), e);
      throw new RuntimeException(e);
    }
  }

字符串
当我尝试在Producer服务中访问KafkaTemplate时,这导致了kafkaTemplates.get(id)上的NPE:

@Service
@Slf4j
@Getter
@Setter
public class KafkaProducerService {

  private final Map<String, KafkaTemplate<String, String>> kafkaTemplates;

  private Schema avroSchema;

  public KafkaProducerService(Map<String, KafkaTemplate<String, String>> kafkaTemplates) {
    this.kafkaTemplates = kafkaTemplates;
  }

  public void produce(String id, String topic, String key, VehicleHeartbeat message) {
    GenericRecord record = generateAvroRecord(message);
    log.info("generic record: {}", record);
    kafkaTemplates.get(id).send(topic, key, record.toString());
  }


我该如何处理Spring的Kafka?

vulvrdjw

vulvrdjw1#

可能与其他Map bean冲突。
你可以尝试在kafkaTemplates方法上替换@Bean,如下所示:

@Bean("myKafkaTemplatesMap")
@Primary
public Map<String, KafkaTemplate<String, String>> kafkaTemplates() {
    ...
}

字符串
然后在服务构造函数中,你用相同的名字来限定它,这样你就可以确保注入正确的bean:

@Service
public class KafkaProducerService {

    public KafkaProducerService(@Qualifier("myKafkaTemplatesMap") Map<String, KafkaTemplate<String, String>> kafkaTemplates) {
        this.kafkaTemplates = kafkaTemplates;
    }
}

相关问题