Apache Kafka提供了一个高级API,用于序列化和反序列化记录值及其密钥。它存在于org.apache.kafka.common.serialization.Serializer<T>
和org.apache.kafka.common.serialization.Deserializer<T>
抽象类中,其中包含一些内置实现。同时,我们可以使用Producer
或Consumer
配置属性指定序列化程序和反序列化程序类。下面的示例演示了如何执行此操作:
props.put(ConsumerConfig.KEY\_DESERIALIZER\_CLASS\_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE\_DESERIALIZER\_CLASS\_CONFIG, StringDeserializer.class);
...
props.put(ProducerConfig.KEY\_SERIALIZER\_CLASS\_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE\_SERIALIZER\_CLASS\_CONFIG, StringSerializer.class);
对于更复杂或特殊的情况,KafkaConsumer
提供重载构造函数,分别接受key
和value
的序列化和反序列化实例。
当您使用此API时, DefaultKafkaProducerFactory
andDefaultKafkaConsumerFactory
还提供属性(通过构造函数或setter方法)以将自定义序列化和反序列化实例注入目标Producer
或Consumer
。此外,还可以通过构造函数传入Supplier<Serializer>
或Supplier<Deserializer>
实例-这些Supplier
在创建每个Producer
或Consumer
时被调用。
Spring Kafka还提供了基于Jackson JSON对象映射器的JsonSerializer
和JSONderializer
实现。JsonSerializer
允许以JSONbyte[]
的形式编写任何Java对象。JsonDeserializer
需要一个Class<?> targetType
参数,允许将使用的byte[]
反序列化为正确的目标对象。下面的示例演示如何创建JsonDeserializer
:
JsonDeserializer thingDeserializer = new JsonDeserializer<>(Thing.class);
您可以使用ObjectMapper
自定义JsonSerializer
和JsonDeserializer
。您还可以扩展它们以实现配置中的某些特定配置逻辑configure(Map<String, ?> configs, boolean isKey)
方法。
从2.3版开始,默认情况下,所有支持JSON的组件都配置有一个JacksonUtils.enhancedObjectMapper()
实例,该实例带有MapperFeature.DEFAULT_VIEW_INCLUSION
和DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES
功能禁用。此外,这样的实例还提供了用于自定义数据类型的众所周知的模块,例如Java时间和Kotlin支持。有关更多信息,请参见JacksonUtils.enhancedObjectMapper()
官方文档。此方法还将org.springframework.kafka.support.JacksonMimeTypeModule
fororg.springframework.util.MimeType
对象序列化注册到纯字符串中,以便在网络上实现平台间兼容性。JacksonMimeTypeModule
可以在应用程序上下文中注册为bean,它将自动配置到Spring BootObjectMapper
instance.实例中。
同样从版本2.3开始,JsonDeserializer
提供了基于TypeReference
的构造函数,以便更好地处理目标泛型容器类型。
从版本2.1开始,可以在记录Headers
中传递类型信息,从而允许处理多个类型。此外,可以使用以下Kafka属性配置序列化程序和反序列化程序:
JsonSerializer.ADD_TYPE_INFO_HEADERS
(默认true
): 可以将其设置为false
以禁用JsonSerializer
上的此功能(设置addTypeInfo
属性)。
JsonSerializer.TYPE_MAPPINGS
(默认empty
): 请参见Mapping Types。
JsonDeserializer.USE_TYPE_INFO_HEADERS
(默认true
): 可以将其设置为false
以忽略序列化设置的Headers
。
JsonDeserializer.REMOVE_TYPE_INFO_HEADERS
(默认true
):可以将其设置为false
以保留序列化设置的Headers
。
JsonDeserializer.KEY_DEFAULT_TYPE
: 如果不存在Headers
信息,则回退反序列化的keys
。
JsonDeserializer.VALUE_DEFAULT_TYPE
: 如果不存在Headers
信息,则回退反序列化的values
。
JsonDeserializer.TRUSTED_PACKAGES
(默认java.util
,java.lang
): 允许反序列化以逗号分隔的包模式列表。*意味着全部反序列化。
JsonDeserializer.TYPE_MAPPINGS
(默认empty
): 请参见Mapping Types.
从版本2.2开始,反序列化将删除类型信息Headers
(如果由序列化添加)。通过直接在反序列化或使用前面描述的配置属性将removeTypeHeaders
属性设置为false
,可以还原为以前的行为。
当以编程方式构造序列化/反序列化以便在Producer
或Consumer
工厂中使用时,自版本2.3以来,您可以使用此API,它简化了配置。
以下示例假设您使用的是Spring Boot:
@Bean
public DefaultKafkaProducerFactory pf(KafkaProperties properties) {
Map<String, Object> props = properties.buildProducerProperties();
DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory(props,
new JsonSerializer<>(MyKeyType.class)
.forKeys()
.noTypeInfo(),
new JsonSerializer<>(MyValueType.class)
.noTypeInfo());
}
@Bean
public DefaultKafkaConsumerFactory pf(KafkaProperties properties) {
Map<String, Object> props = properties.buildConsumerProperties();
DefaultKafkaConsumerFactory pf = new DefaultKafkaConsumerFactory(props,
new JsonDeserializer<>(MyKeyType.class)
.forKeys()
.ignoreTypeHeaders(),
new JsonSerializer<>(MyValueType.class)
.ignoreTypeHeaders());
}
从2.2版开始,在使用JSON时,现在可以通过使用前面列表中的属性来提供类型映射。以前,您必须在序列化程序和反序列化程序中自定义类型映射器。映射由逗号分隔的token:className
对列表组成。在出站时,有效负载的类名映射到相应的令牌。入站时,类型头中的标记映射到相应的类名。
以下示例创建一组映射:
senderProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
senderProps.put(JsonSerializer.TYPE_MAPPINGS, "cat:com.mycat.Cat, hat:com.myhat.hat");
...
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
consumerProps.put(JsonDeSerializer.TYPE_MAPPINGS, "cat:com.yourcat.Cat, hat:com.yourhat.hat");
对应的对象必须兼容
如果使用Spring Boot,可以在application.properties
(或yaml)文件中提供这些属性。下面的示例演示了如何执行此操作:
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties.spring.json.type.mapping=cat:com.mycat.Cat,hat:com.myhat.Hat
您只能使用属性执行简单配置。对于更高级的配置(例如在序列化和反序列化中使用自定义ObjectMapper),应使用接受预构建序列化和反序列化的
Producer
或Consumer
工厂构造函数。以下Spring引导示例覆盖默认工厂:
@Bean
public ConsumerFactory<Foo, Bar> kafkaConsumerFactory(KafkaProperties properties,
JsonDeserializer customDeserializer) {
return new DefaultKafkaConsumerFactory<>(properties.buildConsumerProperties(),
customDeserializer, customDeserializer);
}
@Bean
public ProducerFactory<Foo, Bar> kafkaProducerFactory(KafkaProperties properties,
JsonSerializer customSerializer) {
return new DefaultKafkaProducerFactory<>(properties.buildProducerProperties(),
customSerializer, customSerializer);
}
还提供了setter,作为使用这些构造函数的替代方法。
从版本2.2开始,您可以显式配置反序列化程序以使用提供的目标类型,并通过使用具有布尔useHeadersIfPresent
(默认情况下为true
)的重载构造函数之一忽略头中的类型信息。下面的示例演示了如何执行此操作:
DefaultKafkaConsumerFactory<Integer, Cat1> cf = new DefaultKafkaConsumerFactory<>(props,
new IntegerDeserializer(), new JsonDeserializer<>(Cat1.class, false));
版本2.3引入了DelegatingSerializer
和DelegatingDeserializer
,它们允许生成和使用具有不同键和/或值类型的记录。生产者必须将头DelegatingSerializer.SERIALIZATION
选择器设置为用于选择要使用的序列化程序的选择器值;如果未找到匹配项,则引发IllegalStateException
。
对于传入记录,反序列化程序使用相同的头来选择要使用的反序列化程序;如果找不到匹配项或头不存在,则返回byte[]
。
您可以通过构造函数配置选择器到序列化/反序列化的映射,也可以使用DelegatingSerializer.SERIALIZATION_SELECTOR_CONFIG
通过Producer
或Consumer
属性配置它。对于序列化,Producer
属性可以是Map<String,Object>
,其中key
是选择器,value
是序列化实例、序列化类或类名。属性也可以是逗号分隔的映射项字符串,如下所示。
对于反序列化程序,Consumer
属性可以是Map<String,Object>
,其中key
是选择器,value
是反序列化实例、反序列化类或类名。属性也可以是逗号分隔的映射项字符串,如下所示。
要使用属性进行配置,请使用以下语法:
producerProps.put(DelegatingSerializer.SERIALIZATION_SELECTOR_CONFIG,
"thing1:com.example.MyThing1Serializer, thing2:com.example.MyThing2Serializer")
consumerProps.put(DelegatingDeserializer.SERIALIZATION_SELECTOR_CONFIG,
"thing1:com.example.MyThing1Deserializer, thing2:com.example.MyThing2Deserializer")
然后,Producer
将DelegatingSerializer.SERIALIZATION_SELECTOR
头设置为thing1
或thing2
。
RetryingDeserializer
使用委托Deserializer
和RetryTemplate
在反序列化过程中,当委托可能出现暂时性错误(如网络问题)时,重试反序列化。
ConsumerFactory cf = new DefaultKafkaConsumerFactory(myConsumerConfigs,
new RetryingDeserializer(myUnreliableKeyDeserializer, retryTemplate),
new RetryingDeserializer(myUnreliableValueDeserializer, retryTemplate));
请参阅spring-retry了解带有重试策略、后退策略等RetryTemplate
的配置。
尽管Serializer
和Deserializer
的API从低级Consumer
andProducer
的角度来看是非常简单和灵活的, 但是在使用@KafkaListener
或Spring 集成时,您可能需要在Spring消息传递级别具有更大的灵活性。为了让您能够轻松地转换到org.springframework.messaging.Message,Kafka提供了一个带有带有MessagingMessageConverter
实现及其JsonMessageConverter
(和子类)定制的MessageConverter
抽象。通过使用@KafkaListener.containerFactory()
属性的AbstractKafkaListenerContainerFactory
bean定义,您可以将MessageConverter
直接注入到KafkaTemplate
实例中。下面的示例演示了如何执行此操作:
@Bean
public KafkaListenerContainerFactory<?, ?> kafkaJsonListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setMessageConverter(new JsonMessageConverter());
return factory;
}
...
@KafkaListener(topics = "jsonData",
containerFactory = "kafkaJsonListenerContainerFactory")
public void jsonListener(Cat cat) {
...
}
使用@KafkaListener
时,参数类型将提供给消息转换器,以帮助进行转换。
只有在方法级别声明了
@KafkaListener
批注时,才能实现此类型推断。对于类级别@KafkaListener
,有效负载类型用于选择要调用的@KafkaHandler
方法,因此在选择该方法之前,必须已对其进行了转换。
在
consumer
端,可以配置JsonMessageConverter
;它可以处理byte[]
,Bytes
和String
类型的ConsumerRecord
,因此应该与ByteArrayDeserializer
、byteDeserializer
或StringDeserializer
一起使用。(byte[]
和Bytes
更有效,因为它们避免了不必要的byte[]
到String
转换)。如果愿意,还可以配置与反序列化对应的JsonMessageConverter
的特定子类。
在producer
方面,当您使用KafkaTemplate.send(Message<?> message)
方法(请参见UsingKafkaTemplate
),必须配置与已配置的Kafka序列化兼容的消息转换器。
带有
StringSerializer
的StringJsonMessageConverter
带有
BytesSerializer
的BytesJsonMessageConverter
带有
ByteArraySerializer
的ByteArrayJsonMessageConverter
同样,使用byte[]
或Bytes
更有效,因为它们避免了String
到byte[]
的转换。
为了方便起见,从2.3版开始,框架还提供了一个StringOrBytesSerializer
,它可以序列化所有三种value
类型,以便可以与任何消息转换器一起使用。
从2.1.1版开始,您可以将JSON转换为Spring数据投影接口,而不是具体的类型。这允许对数据进行非常选择性和低耦合的绑定,包括从JSON文档中的多个位置查找值。例如,以下接口可以定义为消息负载类型:
interface SomeSample {
@JsonPath({ "$.username", "$.user.name" })
String getUsername();
}
@KafkaListener(id="projection.listener", topics = "projection")
public void projection(SomeSample in) {
String username = in.getUsername();
...
}
默认情况下,访问器方法将用于在接收到的JSON文档中查找属性名作为字段。@JSON Path
表达式允许自定义值查找,甚至可以定义多个JSON路径表达式,从多个位置查找值,直到表达式返回实际值。
要启用此功能,请使用配置了相应委托转换器的ProjectingMessageConverter
(用于出站转换和转换非投影接口)。还必须将spring-data:spring-data-commons
和com.jayway.json path:json path
添加到类路径中。
ErrorHandlingDeserializer
当反序列化无法反序列化消息时,Spring无法处理该问题,因为它发生在poll()
返回之前。为了解决这个问题,版本2.2引入了ErrorHandlingDeserializer2
。此反序列化委托给真正的反序列化(key 或 value)。如果委托未能反序列化记录内容,则ErrorHandlingDeserializer2
在包含原因和原始字节的头中返回空值和反序列化异常。当您使用记录级MessageListener
时,如果ConsumerRecord
包含key或value的反序列化异常头,则使用失败的ConsumerRecord
调用容器的错误处理程序。记录不会传递给侦听器。
或者,您可以将ErrorHandlingDeserializer2
配置为通过提供failedDeserializationFunction
来创建自定义值,failedDeserializationFunction
是函数Function<FailedDeserializationInfo, T>
。这个函数被调用来创建一个T
的实例,该实例以通常的方式传递给监听器。FailedDeserializationInfo
类型的对象,包含提供给函数的所有上下文信息。您可以在头文件中找到反序列化异常(作为序列化的Java对象)。有关更多信息,请参阅ErrorHandlingDeserializer2
的Java文档
使用
BatchMessageListener
时,必须提供failedDeserializationFunction
。否则,该批记录不是类型安全的。
您可以使用DefaultKafkaConsumerFactory
构造函数,该构造函数接受key和valueDeserializer
对象,并连接使用合适的委托配置的ErrorHandlingDeserializer2
实例.或者,可以使用Consumer
配置属性(由ErrorHandlingDeserializer
使用)来实例化委托。属性名为ErrorHandlingDeserializer2.KEY_DESERIALIZER_CLASS
和ErrorHandlingDeserializer2.VALUE_DESERIALIZER_CLASS
。属性值可以是类或类名。下面的示例演示如何设置这些属性:
... // other props
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, JsonDeserializer.class);
props.put(JsonDeserializer.KEY_DEFAULT_TYPE, "com.example.MyKey")
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "com.example.MyValue")
props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example")
return new DefaultKafkaConsumerFactory<>(props);
下面的示例使用failedDeserializationFunction
.
public class BadFoo extends Foo {
private final FailedDeserializationInfo failedDeserializationInfo;
public BadFoo(FailedDeserializationInfo failedDeserializationInfo) {
this.failedDeserializationInfo = failedDeserializationInfo;
}
public FailedDeserializationInfo getFailedDeserializationInfo() {
return this.failedDeserializationInfo;
}
}
public class FailedFooProvider implements Function<FailedDeserializationInfo, Foo> {
@Override
public Foo apply(FailedDeserializationInfo info) {
return new BadFoo(info);
}
}
前面的示例使用以下配置:
...
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
consumerProps.put(ErrorHandlingDeserializer2.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
consumerProps.put(ErrorHandlingDeserializer2.VALUE_FUNCTION, FailedFooProvider.class);
...
使用批处理侦听器容器工厂时,还可以在BatchMessagingMessageConverter
中使用JsonMessageConverter
来转换批处理消息。有关详细信息,请参阅序列化、反序列化、消息转换和消息转换
默认情况下,转换的类型是从侦听器参数推断出来的。如果使用DefaultJackson2TypeMapper
配置JsonMessageConverter
,并将其TypePrecedence
设置为TYPE_ID
(而不是默认的INFERRED
),则转换器将使用headers(如果存在)中的类型信息。例如,这允许用接口而不是具体类来声明侦听器方法。此外,类型转换器支持映射,因此反序列化可以是与源不同的类型(只要数据兼容)。当您使用类级别@KafkaListener
实例时,这也很有用,其中必须已经转换了负载以确定要调用的方法。以下示例创建使用此方法的bean:
@Bean
public KafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
factory.setMessageConverter(new BatchMessagingMessageConverter(converter()));
return factory;
}
@Bean
public JsonMessageConverter converter() {
return new JsonMessageConverter();
}
请注意,若要执行此操作,转换目标的方法签名必须是具有单个泛型参数类型的容器对象,例如:
@KafkaListener(topics = "blc1")
public void listen(List<Foo> foos, @Header(KafkaHeaders.OFFSET) List<Long> offsets) {
...
}
请注意,您仍然可以访问批处理标题。
如果批处理转换器具有支持它的记录转换器,则还可以接收根据泛型类型转换有效负载的消息列表。下面的示例演示了如何执行此操作:
@KafkaListener(topics = "blc3", groupId = "blc3")
public void listen1(List<Message<Foo>> fooMessages) {
...
}
ConversionService
从版本2.1.1开始,默认的o.s.messaging.handler.annotation.support.MessageHandlerMethodFactory
使用org.springframework.core.convert.ConversionService
解析调用侦听器方法的参数随实现以下任何接口的所有bean一起提供:
org.springframework.core.convert.converter.Converter
org.springframework.core.convert.converter.GenericConverter
org.springframework.format.Formatter
这允许您进一步自定义侦听器反序列化,而不必更改ConsumerFactory
和KafkaListenerContainerFactory
的默认配置。
通过在
KafkaListenerEndpointRegistrar
上的KafkaListenerConfigurer
bean设置自定义MessageHandlerMethodFactory
将禁用此功能。
内容来源于网络,如有侵权,请联系作者删除!