如何让kafka消费者处理localdate类型?

agyaoht7  于 2021-06-04  发布在  Kafka
关注(0)|答案(2)|浏览(255)

我把我的一块地从 stringLocalDate 类型,现在我的Kafka消费者无法处理它。在阅读了下面的错误之后,看起来我需要反序列化这个值。我想修改一下 consumerFactory 下面包括 LocalDateDeserializer 会解决这个问题,但还没有。我需要创建一个自定义反序列化程序,或者我可以修改我的系统中的一些配置吗 consumerFactory ?

Caused by: org.apache.kafka.common.errors.SerializationException: Can't deserialize data [[...125]] from topic [Service.Topic]
Caused by: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of `java.time.LocalDate` (no Creators, like default construct, exist): cannot deserialize from Object value (no delegate- or property-based Creator)

代码:

public ConsumerFactory<String, Request> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LocalDateDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
um6iljoc

um6iljoc1#

我认为这不是Kafka的问题,而是Jackson的问题。
默认情况下,Kafka使用jackson进行序列化/反序列化。你可以注册 JavaTimeModule 对于 ObjectMapper 建造 JsonDeserializer 用它。然后建立您的自定义 ConsumerFactory :

public ConsumerFactory<String, Request> consumerFactory() {

     ObjectMapper objectMapper = new ObjectMapper();
     objectMapper.registerModule(new JavaTimeModule());

     Map<String, Object> config = new HashMap<>();
     config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

     StringDeserializer keyDeserializer = new StringDeserializer();
     JsonDeserializer<Request> jsonDeserializer = new JsonDeserializer<>(objectMapper);

     ConsumerFactory<String, Request> factory = new DefaultKafkaConsumerFactory<>(config, keyDeserializer,  jsonDeserializer);
     return factory;
}

您必须将jackson-datatype-jsr310依赖项添加到您的项目中才能使 JavaTimeModule 无障碍。
如果您使用的是spring boot并且希望配置 ObjectMapper 在全球范围内,您可以执行以下操作,例如:

@Configuration
@EnableKafka
public class KafkaConfig {
    @Bean
    public ConsumerFactory<String, Request> consumerFactory(Jackson2ObjectMapperBuilder objectMapperBuilder) {

        Map<String, Object> config = new HashMap<>();
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

        StringDeserializer keyDeserializer = new StringDeserializer();
        JsonDeserializer<Request> jsonDeserializer = new JsonDeserializer<>(objectMapperBuilder.build());

        ConsumerFactory<String, Request> factory = new DefaultKafkaConsumerFactory<>(config, keyDeserializer,  jsonDeserializer);
        return factory;
    }

    @Bean
    public Jackson2ObjectMapperBuilder objectMapperBuilder() {
        Jackson2ObjectMapperBuilder builder = new Jackson2ObjectMapperBuilder();
        builder.modulesToInstall(new JavaTimeModule());
        return builder;
    }
}
qmb5sa22

qmb5sa222#

这解决了问题!我在我的字段声明中使用了下面的注解。

@JsonDeserialize(using = LocalDateDeserializer.class)
@JsonSerialize(using = LocalDateSerializer.class)

相关问题