我有kafka producer配置,直到这次我还是以字符串类型发送密钥,并配置了如下的密钥序列化程序,
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ObjectSerializer.class.getName());
但是我们有一个要求,某个时间键可能也很长,所以不需要创建另一个producer config,我们是否可以像objectserializer那样配置上面的东西?有这样的礼物吗?
我的SpringKafka版本是2.1,所以我不能使用delegatingserializer,也不能升级项目中的版本。
更新1:
我创建了如下的自定义序列化程序,并将其配置为密钥序列化程序,但是在发布了作为密钥的长值的消息之后,当我看到该密钥包含一些垃圾符号时,如果其中有任何错误,是否有人可以更正下面的代码。
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;
import java.io.UnsupportedEncodingException;
import java.util.Map;
public class LongStringSerializer implements Serializer<Object> {
private String encoding = "UTF8";
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
String propertyName = isKey ? "key.serializer.encoding" : "value.serializer.encoding";
Object encodingValue = configs.get(propertyName);
if (encodingValue == null)
encodingValue = configs.get("serializer.encoding");
if (encodingValue != null && encodingValue instanceof String)
encoding = (String) encodingValue;
}
@Override
public byte[] serialize(String topic, Object data) {
if (data == null)
return null;
if (data instanceof Long) {
return serialize(Long.parseLong(data.toString()));
}
if (data instanceof String) {
return serializeStringData((String) data);
}
return new byte[0];
}
private byte[] serializeStringData(String data) {
try {
return data.getBytes(encoding);
} catch (UnsupportedEncodingException e) {
throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + encoding);
}
}
private byte[] serialize(Long data) {
if (data == null)
return null;
return new byte[] {
(byte) (data >>> 56),
(byte) (data >>> 48),
(byte) (data >>> 40),
(byte) (data >>> 32),
(byte) (data >>> 24),
(byte) (data >>> 16),
(byte) (data >>> 8),
data.byteValue()
};
}
@Override
public void close() {
// nothing to do
}
}
1条答案
按热度按时间ejk8hzay1#
请参阅委托序列化程序。
版本2.3引入了delegatingserializer和delegatingdeserializer,它们允许生成和使用具有不同键和/或值类型的记录。生产者必须将标头delegatingserializer.serialization选择器设置为用于选择要使用的序列化程序的选择器值;如果未找到匹配项,则抛出illegalstateexception。
它需要一个头来告诉它要使用哪个序列化程序,但是您可以编写一个更简单的版本,只查看键类型。
(我想我会加强
DelegatingSerializer
这样做)。另一种选择是使用不同的生产者。
使用版本2.5,可以覆盖
KafkaTemplate
:所以只需定义一个不同的
KafkaTemplate
每种类型。对于旧版本,您需要两个配置不同的生产工厂。