序列化、反序列化和消息转换

x33g5p2x  于2021-03-14 发布在 Kafka  
字(12.8k)|赞(0)|评价(0)|浏览(1195)

概述

Apache Kafka提供了一个高级API,用于序列化和反序列化记录值及其密钥。它存在于org.apache.kafka.common.serialization.Serializer<T>org.apache.kafka.common.serialization.Deserializer<T>抽象类中,其中包含一些内置实现。同时,我们可以使用ProducerConsumer配置属性指定序列化程序和反序列化程序类。下面的示例演示了如何执行此操作:

props.put(ConsumerConfig.KEY\_DESERIALIZER\_CLASS\_CONFIG, IntegerDeserializer.class); 

props.put(ConsumerConfig.VALUE\_DESERIALIZER\_CLASS\_CONFIG, StringDeserializer.class);

 ... 

props.put(ProducerConfig.KEY\_SERIALIZER\_CLASS\_CONFIG, IntegerSerializer.class); 

props.put(ProducerConfig.VALUE\_SERIALIZER\_CLASS\_CONFIG, StringSerializer.class);

对于更复杂或特殊的情况,KafkaConsumer提供重载构造函数,分别接受keyvalue的序列化和反序列化实例。

当您使用此API时, DefaultKafkaProducerFactoryandDefaultKafkaConsumerFactory还提供属性(通过构造函数或setter方法)以将自定义序列化和反序列化实例注入目标ProducerConsumer。此外,还可以通过构造函数传入Supplier<Serializer>Supplier<Deserializer>实例-这些Supplier在创建每个ProducerConsumer时被调用。

JSON

Spring Kafka还提供了基于Jackson JSON对象映射器的JsonSerializerJSONderializer实现。JsonSerializer允许以JSONbyte[]的形式编写任何Java对象。JsonDeserializer需要一个Class<?> targetType参数,允许将使用的byte[]反序列化为正确的目标对象。下面的示例演示如何创建JsonDeserializer

JsonDeserializer thingDeserializer = new JsonDeserializer<>(Thing.class);

您可以使用ObjectMapper自定义JsonSerializerJsonDeserializer。您还可以扩展它们以实现配置中的某些特定配置逻辑configure(Map<String, ?> configs, boolean isKey)方法。

从2.3版开始,默认情况下,所有支持JSON的组件都配置有一个JacksonUtils.enhancedObjectMapper()实例,该实例带有MapperFeature.DEFAULT_VIEW_INCLUSIONDeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES功能禁用。此外,这样的实例还提供了用于自定义数据类型的众所周知的模块,例如Java时间和Kotlin支持。有关更多信息,请参见JacksonUtils.enhancedObjectMapper()官方文档。此方法还将org.springframework.kafka.support.JacksonMimeTypeModulefororg.springframework.util.MimeType对象序列化注册到纯字符串中,以便在网络上实现平台间兼容性。JacksonMimeTypeModule可以在应用程序上下文中注册为bean,它将自动配置到Spring BootObjectMapperinstance.实例中。
同样从版本2.3开始,JsonDeserializer提供了基于TypeReference的构造函数,以便更好地处理目标泛型容器类型。
从版本2.1开始,可以在记录Headers中传递类型信息,从而允许处理多个类型。此外,可以使用以下Kafka属性配置序列化程序和反序列化程序:

  • JsonSerializer.ADD_TYPE_INFO_HEADERS(默认true): 可以将其设置为false以禁用JsonSerializer上的此功能(设置addTypeInfo属性)。

  • JsonSerializer.TYPE_MAPPINGS(默认empty): 请参见Mapping Types

  • JsonDeserializer.USE_TYPE_INFO_HEADERS(默认true): 可以将其设置为false以忽略序列化设置的Headers

  • JsonDeserializer.REMOVE_TYPE_INFO_HEADERS(默认true):可以将其设置为false以保留序列化设置的Headers

  • JsonDeserializer.KEY_DEFAULT_TYPE: 如果不存在Headers信息,则回退反序列化的keys

  • JsonDeserializer.VALUE_DEFAULT_TYPE: 如果不存在Headers信息,则回退反序列化的values

  • JsonDeserializer.TRUSTED_PACKAGES(默认java.util,java.lang): 允许反序列化以逗号分隔的包模式列表。*意味着全部反序列化。

  • JsonDeserializer.TYPE_MAPPINGS(默认empty): 请参见Mapping Types.
    从版本2.2开始,反序列化将删除类型信息Headers(如果由序列化添加)。通过直接在反序列化或使用前面描述的配置属性将removeTypeHeaders属性设置为false,可以还原为以前的行为。
    当以编程方式构造序列化/反序列化以便在ProducerConsumer工厂中使用时,自版本2.3以来,您可以使用此API,它简化了配置。
    以下示例假设您使用的是Spring Boot:

@Bean
public DefaultKafkaProducerFactory pf(KafkaProperties properties) {
    Map<String, Object> props = properties.buildProducerProperties();
    DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory(props,
        new JsonSerializer<>(MyKeyType.class)
            .forKeys()
            .noTypeInfo(),
        new JsonSerializer<>(MyValueType.class)
            .noTypeInfo());
}

@Bean
public DefaultKafkaConsumerFactory pf(KafkaProperties properties) {
    Map<String, Object> props = properties.buildConsumerProperties();
    DefaultKafkaConsumerFactory pf = new DefaultKafkaConsumerFactory(props,
        new JsonDeserializer<>(MyKeyType.class)
            .forKeys()
            .ignoreTypeHeaders(),
        new JsonSerializer<>(MyValueType.class)
            .ignoreTypeHeaders());
}

映射类型

从2.2版开始,在使用JSON时,现在可以通过使用前面列表中的属性来提供类型映射。以前,您必须在序列化程序和反序列化程序中自定义类型映射器。映射由逗号分隔的token:className对列表组成。在出站时,有效负载的类名映射到相应的令牌。入站时,类型头中的标记映射到相应的类名。
以下示例创建一组映射:

senderProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
senderProps.put(JsonSerializer.TYPE_MAPPINGS, "cat:com.mycat.Cat, hat:com.myhat.hat");
...
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
consumerProps.put(JsonDeSerializer.TYPE_MAPPINGS, "cat:com.yourcat.Cat, hat:com.yourhat.hat");

对应的对象必须兼容

如果使用Spring Boot,可以在application.properties(或yaml)文件中提供这些属性。下面的示例演示了如何执行此操作:

spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties.spring.json.type.mapping=cat:com.mycat.Cat,hat:com.myhat.Hat

您只能使用属性执行简单配置。对于更高级的配置(例如在序列化和反序列化中使用自定义ObjectMapper),应使用接受预构建序列化和反序列化的ProducerConsumer工厂构造函数。以下Spring引导示例覆盖默认工厂:

@Bean
public ConsumerFactory<Foo, Bar> kafkaConsumerFactory(KafkaProperties properties,
    JsonDeserializer customDeserializer) {

    return new DefaultKafkaConsumerFactory<>(properties.buildConsumerProperties(),
        customDeserializer, customDeserializer);
}

@Bean
public ProducerFactory<Foo, Bar> kafkaProducerFactory(KafkaProperties properties,
    JsonSerializer customSerializer) {

    return new DefaultKafkaProducerFactory<>(properties.buildProducerProperties(),
        customSerializer, customSerializer);
}

还提供了setter,作为使用这些构造函数的替代方法。

从版本2.2开始,您可以显式配置反序列化程序以使用提供的目标类型,并通过使用具有布尔useHeadersIfPresent(默认情况下为true)的重载构造函数之一忽略头中的类型信息。下面的示例演示了如何执行此操作:

DefaultKafkaConsumerFactory<Integer, Cat1> cf = new DefaultKafkaConsumerFactory<>(props,
        new IntegerDeserializer(), new JsonDeserializer<>(Cat1.class, false));

委派序列化和反序列化

版本2.3引入了DelegatingSerializerDelegatingDeserializer,它们允许生成和使用具有不同键和/或值类型的记录。生产者必须将头DelegatingSerializer.SERIALIZATION选择器设置为用于选择要使用的序列化程序的选择器值;如果未找到匹配项,则引发IllegalStateException

对于传入记录,反序列化程序使用相同的头来选择要使用的反序列化程序;如果找不到匹配项或头不存在,则返回byte[]

您可以通过构造函数配置选择器到序列化/反序列化的映射,也可以使用DelegatingSerializer.SERIALIZATION_SELECTOR_CONFIG通过ProducerConsumer属性配置它。对于序列化,Producer属性可以是Map<String,Object>,其中key是选择器,value是序列化实例、序列化类或类名。属性也可以是逗号分隔的映射项字符串,如下所示。
对于反序列化程序,Consumer属性可以是Map<String,Object>,其中key是选择器,value是反序列化实例、反序列化类或类名。属性也可以是逗号分隔的映射项字符串,如下所示。
要使用属性进行配置,请使用以下语法:

producerProps.put(DelegatingSerializer.SERIALIZATION_SELECTOR_CONFIG,
    "thing1:com.example.MyThing1Serializer, thing2:com.example.MyThing2Serializer")

consumerProps.put(DelegatingDeserializer.SERIALIZATION_SELECTOR_CONFIG,
    "thing1:com.example.MyThing1Deserializer, thing2:com.example.MyThing2Deserializer")

然后,ProducerDelegatingSerializer.SERIALIZATION_SELECTOR头设置为thing1thing2

尝试反序列化

RetryingDeserializer使用委托DeserializerRetryTemplate在反序列化过程中,当委托可能出现暂时性错误(如网络问题)时,重试反序列化。

ConsumerFactory cf = new DefaultKafkaConsumerFactory(myConsumerConfigs,
    new RetryingDeserializer(myUnreliableKeyDeserializer, retryTemplate),
    new RetryingDeserializer(myUnreliableValueDeserializer, retryTemplate));

请参阅spring-retry了解带有重试策略、后退策略等RetryTemplate的配置。

Spring消息转换

尽管SerializerDeserializer的API从低级ConsumerandProducer的角度来看是非常简单和灵活的, 但是在使用@KafkaListenerSpring 集成时,您可能需要在Spring消息传递级别具有更大的灵活性。为了让您能够轻松地转换到org.springframework.messaging.Message,Kafka提供了一个带有带有MessagingMessageConverter实现及其JsonMessageConverter(和子类)定制的MessageConverter抽象。通过使用@KafkaListener.containerFactory()属性的AbstractKafkaListenerContainerFactorybean定义,您可以将MessageConverter直接注入到KafkaTemplate实例中。下面的示例演示了如何执行此操作:

@Bean
public KafkaListenerContainerFactory<?, ?> kafkaJsonListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setMessageConverter(new JsonMessageConverter());
    return factory;
}
...
@KafkaListener(topics = "jsonData",
                containerFactory = "kafkaJsonListenerContainerFactory")
public void jsonListener(Cat cat) {
...
}

使用@KafkaListener时,参数类型将提供给消息转换器,以帮助进行转换。

只有在方法级别声明了@KafkaListener批注时,才能实现此类型推断。对于类级别@KafkaListener,有效负载类型用于选择要调用的@KafkaHandler方法,因此在选择该方法之前,必须已对其进行了转换。

consumer端,可以配置JsonMessageConverter;它可以处理byte[]BytesString类型的ConsumerRecord,因此应该与ByteArrayDeserializerbyteDeserializerStringDeserializer一起使用。(byte[]Bytes更有效,因为它们避免了不必要的byte[]String转换)。如果愿意,还可以配置与反序列化对应的JsonMessageConverter的特定子类。
producer方面,当您使用KafkaTemplate.send(Message<?> message)方法(请参见UsingKafkaTemplate),必须配置与已配置的Kafka序列化兼容的消息转换器。

  • 带有StringSerializerStringJsonMessageConverter

  • 带有BytesSerializerBytesJsonMessageConverter

  • 带有ByteArraySerializerByteArrayJsonMessageConverter
    同样,使用byte[]Bytes更有效,因为它们避免了Stringbyte[]的转换。
    为了方便起见,从2.3版开始,框架还提供了一个StringOrBytesSerializer,它可以序列化所有三种value类型,以便可以与任何消息转换器一起使用。

使用Spring数据投影接口

从2.1.1版开始,您可以将JSON转换为Spring数据投影接口,而不是具体的类型。这允许对数据进行非常选择性和低耦合的绑定,包括从JSON文档中的多个位置查找值。例如,以下接口可以定义为消息负载类型:

interface SomeSample {

  @JsonPath({ "$.username", "$.user.name" })
  String getUsername();

}
@KafkaListener(id="projection.listener", topics = "projection")
public void projection(SomeSample in) {
    String username = in.getUsername();
    ...
}

默认情况下,访问器方法将用于在接收到的JSON文档中查找属性名作为字段。@JSON Path表达式允许自定义值查找,甚至可以定义多个JSON路径表达式,从多个位置查找值,直到表达式返回实际值。
要启用此功能,请使用配置了相应委托转换器的ProjectingMessageConverter(用于出站转换和转换非投影接口)。还必须将spring-data:spring-data-commonscom.jayway.json path:json path添加到类路径中。

使用ErrorHandlingDeserializer

当反序列化无法反序列化消息时,Spring无法处理该问题,因为它发生在poll()返回之前。为了解决这个问题,版本2.2引入了ErrorHandlingDeserializer2。此反序列化委托给真正的反序列化(key 或 value)。如果委托未能反序列化记录内容,则ErrorHandlingDeserializer2在包含原因和原始字节的头中返回空值和反序列化异常。当您使用记录级MessageListener时,如果ConsumerRecord包含key或value的反序列化异常头,则使用失败的ConsumerRecord调用容器的错误处理程序。记录不会传递给侦听器。
或者,您可以将ErrorHandlingDeserializer2配置为通过提供failedDeserializationFunction来创建自定义值,failedDeserializationFunction是函数Function<FailedDeserializationInfo, T>。这个函数被调用来创建一个T的实例,该实例以通常的方式传递给监听器。FailedDeserializationInfo类型的对象,包含提供给函数的所有上下文信息。您可以在头文件中找到反序列化异常(作为序列化的Java对象)。有关更多信息,请参阅ErrorHandlingDeserializer2Java文档

使用BatchMessageListener时,必须提供failedDeserializationFunction。否则,该批记录不是类型安全的。

您可以使用DefaultKafkaConsumerFactory构造函数,该构造函数接受key和valueDeserializer对象,并连接使用合适的委托配置的ErrorHandlingDeserializer2实例.或者,可以使用Consumer配置属性(由ErrorHandlingDeserializer使用)来实例化委托。属性名为ErrorHandlingDeserializer2.KEY_DESERIALIZER_CLASSErrorHandlingDeserializer2.VALUE_DESERIALIZER_CLASS。属性值可以是类或类名。下面的示例演示如何设置这些属性:

... // other props
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, JsonDeserializer.class);
props.put(JsonDeserializer.KEY_DEFAULT_TYPE, "com.example.MyKey")
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "com.example.MyValue")
props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example")
return new DefaultKafkaConsumerFactory<>(props);

下面的示例使用failedDeserializationFunction.

public class BadFoo extends Foo {

  private final FailedDeserializationInfo failedDeserializationInfo;

  public BadFoo(FailedDeserializationInfo failedDeserializationInfo) {
    this.failedDeserializationInfo = failedDeserializationInfo;
  }

  public FailedDeserializationInfo getFailedDeserializationInfo() {
    return this.failedDeserializationInfo;
  }

}

public class FailedFooProvider implements Function<FailedDeserializationInfo, Foo> {

  @Override
  public Foo apply(FailedDeserializationInfo info) {
    return new BadFoo(info);
  }

}

前面的示例使用以下配置:

...
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
consumerProps.put(ErrorHandlingDeserializer2.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
consumerProps.put(ErrorHandlingDeserializer2.VALUE_FUNCTION, FailedFooProvider.class);
...

使用批处理侦听器的负载转换

使用批处理侦听器容器工厂时,还可以在BatchMessagingMessageConverter中使用JsonMessageConverter来转换批处理消息。有关详细信息,请参阅序列化、反序列化、消息转换消息转换
默认情况下,转换的类型是从侦听器参数推断出来的。如果使用DefaultJackson2TypeMapper配置JsonMessageConverter,并将其TypePrecedence设置为TYPE_ID(而不是默认的INFERRED),则转换器将使用headers(如果存在)中的类型信息。例如,这允许用接口而不是具体类来声明侦听器方法。此外,类型转换器支持映射,因此反序列化可以是与源不同的类型(只要数据兼容)。当您使用类级别@KafkaListener实例时,这也很有用,其中必须已经转换了负载以确定要调用的方法。以下示例创建使用此方法的bean:

@Bean
public KafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setBatchListener(true);
    factory.setMessageConverter(new BatchMessagingMessageConverter(converter()));
    return factory;
}
@Bean
public JsonMessageConverter converter() {
    return new JsonMessageConverter();
}

请注意,若要执行此操作,转换目标的方法签名必须是具有单个泛型参数类型的容器对象,例如:

@KafkaListener(topics = "blc1")
public void listen(List<Foo> foos, @Header(KafkaHeaders.OFFSET) List<Long> offsets) {
    ...
}

请注意,您仍然可以访问批处理标题。
如果批处理转换器具有支持它的记录转换器,则还可以接收根据泛型类型转换有效负载的消息列表。下面的示例演示了如何执行此操作:

@KafkaListener(topics = "blc3", groupId = "blc3")
public void listen1(List<Message<Foo>> fooMessages) {
    ...
}

定制ConversionService

从版本2.1.1开始,默认的o.s.messaging.handler.annotation.support.MessageHandlerMethodFactory使用org.springframework.core.convert.ConversionService解析调用侦听器方法的参数随实现以下任何接口的所有bean一起提供:

  • org.springframework.core.convert.converter.Converter

  • org.springframework.core.convert.converter.GenericConverter

  • org.springframework.format.Formatter

这允许您进一步自定义侦听器反序列化,而不必更改ConsumerFactoryKafkaListenerContainerFactory的默认配置。

通过在KafkaListenerEndpointRegistrar上的KafkaListenerConfigurer bean设置自定义MessageHandlerMethodFactory将禁用此功能。

相关文章