如何配置kafka密钥序列化程序,有时密钥很长,有时密钥是字符串?

zour9fqk  于 2021-06-05  发布在  Kafka
关注(0)|答案(1)|浏览(431)

我有kafka producer配置,直到这次我还是以字符串类型发送密钥,并配置了如下的密钥序列化程序,

configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ObjectSerializer.class.getName());

但是我们有一个要求,某个时间键可能也很长,所以不需要创建另一个producer config,我们是否可以像objectserializer那样配置上面的东西?有这样的礼物吗?
我的SpringKafka版本是2.1,所以我不能使用delegatingserializer,也不能升级项目中的版本。
更新1:
我创建了如下的自定义序列化程序,并将其配置为密钥序列化程序,但是在发布了作为密钥的长值的消息之后,当我看到该密钥包含一些垃圾符号时,如果其中有任何错误,是否有人可以更正下面的代码。

import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;

import java.io.UnsupportedEncodingException;
import java.util.Map;

public class LongStringSerializer implements Serializer<Object> {

    private String encoding = "UTF8";

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        String propertyName = isKey ? "key.serializer.encoding" : "value.serializer.encoding";
        Object encodingValue = configs.get(propertyName);
        if (encodingValue == null)
            encodingValue = configs.get("serializer.encoding");
        if (encodingValue != null && encodingValue instanceof String)
            encoding = (String) encodingValue;
    }

    @Override
    public byte[] serialize(String topic, Object data) {
        if (data == null)
            return null;

        if (data instanceof Long) {
            return serialize(Long.parseLong(data.toString()));
        }

        if (data instanceof String) {
            return serializeStringData((String) data);
        }
        return new byte[0];
    }

    private byte[] serializeStringData(String data) {
        try {
            return data.getBytes(encoding);
        } catch (UnsupportedEncodingException e) {
            throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + encoding);
        }
    }

    private byte[] serialize(Long data) {
        if (data == null)
            return null;

        return new byte[] {
                (byte) (data >>> 56),
                (byte) (data >>> 48),
                (byte) (data >>> 40),
                (byte) (data >>> 32),
                (byte) (data >>> 24),
                (byte) (data >>> 16),
                (byte) (data >>> 8),
                data.byteValue()
        };
    }

    @Override
    public void close() {
        // nothing to do
    }
}
ejk8hzay

ejk8hzay1#

请参阅委托序列化程序。
版本2.3引入了delegatingserializer和delegatingdeserializer,它们允许生成和使用具有不同键和/或值类型的记录。生产者必须将标头delegatingserializer.serialization选择器设置为用于选择要使用的序列化程序的选择器值;如果未找到匹配项,则抛出illegalstateexception。
它需要一个头来告诉它要使用哪个序列化程序,但是您可以编写一个更简单的版本,只查看键类型。
(我想我会加强 DelegatingSerializer 这样做)。
另一种选择是使用不同的生产者。
使用版本2.5,可以覆盖 KafkaTemplate :

/**
     * Create an instance using the supplied producer factory and properties, with
     * autoFlush false. If the configOverrides is not null or empty, a new
     * {@link DefaultKafkaProducerFactory} will be created with merged producer properties
     * with the overrides being applied after the supplied factory's properties.
     * @param producerFactory the producer factory.
     * @param configOverrides producer configuration properties to override.
     * @since 2.5
     */
    public KafkaTemplate(ProducerFactory<K, V> producerFactory, @Nullable Map<String, Object> configOverrides) {

所以只需定义一个不同的 KafkaTemplate 每种类型。
对于旧版本,您需要两个配置不同的生产工厂。

相关问题