在可流动和Kafka启用的情况下,应用程序运行,但当我配置Kafka bean ConcurrentKafkaListenerContainerFactory时。它生成一个错误:找不到<'org.springframework.Kafka.core.KafkaOperations'。请考虑在配置中定义类型为'org.springframework.Kafka.core.KafkaOperations'的bean。>>
同样的配置,我在另一个没有可流动的项目中做了它。在那里,它运行没有任何问题。
@Bean
public ConcurrentKafkaListenerContainerFactory<String, CreateRecurrenceCommand>
createRecurrenceCommandListenerContainerFactory(KafkaCreateRecurrenceErrorHandler errorHandler) {
ConcurrentKafkaListenerContainerFactory<String, CreateRecurrenceCommand> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setCommonErrorHandler(errorHandler);
factory.setConsumerFactory(createRecurrenceCommandConsumerFactory());
factory.setReplyTemplate(kafkaTemplate());
factory.setRecordFilterStrategy(consumerRecord -> {
String messageType = new String(consumerRecord.headers().lastHeader(KafkaCustomHeaders.MESSAGE_TYPE).value());
return !ConsumedMessageType.CREATE_RECURRENCE.toString().equals(messageType);
});
return factory;
}
public ConsumerFactory<String, CreateRecurrenceCommand> createRecurrenceCommandConsumerFactory() {
return generateConsumerFactory(CreateRecurrenceCommand.class, AppConstants.CREATE_RECURRENCE);
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
字符串
private ConsumerFactory<String,T> generateConsumerFactory(Class clazz,String groupId){ Map<String,Object> props = new HashMap<>(defaultConsumerFactory. getExcitationProperties());
props.put(
ConsumerConfig.GROUP_ID_CONFIG,
groupId);
ErrorHandlingDeserializer<T> errorHandlingDeserializer = new ErrorHandlingDeserializer<>((topic, data) -> {
try {
if (data == null) {
return null;
}
return objectMapper.readValue(new String(data), clazz);
} catch (JsonProcessingException e) {
throw new DeserializationException("Failed to deserialise", data, false, e);
}
});
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), errorHandlingDeserializer);
}
型
在这段代码中,我们期望过滤特定主题的消息:CREATE_RECURRENCE
1条答案
按热度按时间rwqw0loc1#
我不确定什么是“可流动”,但看起来你试图自己配置一切,而不是依赖于Sping Boot 自动配置。为此,请考虑将
ConcurrentKafkaListenerContainerFactory
的名称更改为kafkaListenerContainerFactory
。