spring 如何在Sping Boot 项目中配置Kafka主题

r1wp621o  于 5个月前  发布在  Spring
关注(0)|答案(1)|浏览(71)

在可流动和Kafka启用的情况下,应用程序运行,但当我配置Kafka bean ConcurrentKafkaListenerContainerFactory时。它生成一个错误:找不到<'org.springframework.Kafka.core.KafkaOperations'。请考虑在配置中定义类型为'org.springframework.Kafka.core.KafkaOperations'的bean。>>
同样的配置,我在另一个没有可流动的项目中做了它。在那里,它运行没有任何问题。

@Bean
public ConcurrentKafkaListenerContainerFactory<String, CreateRecurrenceCommand>
createRecurrenceCommandListenerContainerFactory(KafkaCreateRecurrenceErrorHandler errorHandler) {

    ConcurrentKafkaListenerContainerFactory<String, CreateRecurrenceCommand> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setCommonErrorHandler(errorHandler);
    factory.setConsumerFactory(createRecurrenceCommandConsumerFactory());
    factory.setReplyTemplate(kafkaTemplate());
    factory.setRecordFilterStrategy(consumerRecord -> {
        String messageType = new String(consumerRecord.headers().lastHeader(KafkaCustomHeaders.MESSAGE_TYPE).value());
        return !ConsumedMessageType.CREATE_RECURRENCE.toString().equals(messageType);
    });
    return factory;
}

public ConsumerFactory<String, CreateRecurrenceCommand> createRecurrenceCommandConsumerFactory() {
    return generateConsumerFactory(CreateRecurrenceCommand.class, AppConstants.CREATE_RECURRENCE);
}

@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
    return new KafkaTemplate<>(producerFactory());
}

字符串
private ConsumerFactory<String,T> generateConsumerFactory(Class clazz,String groupId){ Map<String,Object> props = new HashMap<>(defaultConsumerFactory. getExcitationProperties());

props.put(
            ConsumerConfig.GROUP_ID_CONFIG,
            groupId);
    ErrorHandlingDeserializer<T> errorHandlingDeserializer = new ErrorHandlingDeserializer<>((topic, data) -> {
        try {
            if (data == null) {
                return null;
            }
            return objectMapper.readValue(new String(data), clazz);
        } catch (JsonProcessingException e) {
            throw new DeserializationException("Failed to deserialise", data, false, e);
        }
    });
    return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), errorHandlingDeserializer);
}


在这段代码中,我们期望过滤特定主题的消息:CREATE_RECURRENCE

rwqw0loc

rwqw0loc1#

我不确定什么是“可流动”,但看起来你试图自己配置一切,而不是依赖于Sping Boot 自动配置。为此,请考虑将ConcurrentKafkaListenerContainerFactory的名称更改为kafkaListenerContainerFactory

相关问题