在kafka消息负载中添加时间戳

du7egjpx  于 2021-06-07  发布在  Kafka
关注(0)|答案(4)|浏览(484)

有什么方法可以在kafka消息负载中添加时间戳头吗?我想检查消息是何时在使用者端创建的,并在此基础上应用自定义逻辑。
编辑:
我正试图找到一种方法,将一些自定义值(基本上是时间戳)附加到生产者发布的消息上,以便能够在特定的时间段内使用消息。现在,Kafka只确保消息将按照它们放入队列的顺序传递。但在我的例子中,先前生成的记录可能在某个延迟之后到达(因此,在时间t1生成的消息的偏移量1可能高于在稍后时间t2生成的偏移量为0的消息)。由于这个原因,他们将不会在顺序,我期望在消费者的结束。所以我基本上是在寻找一种方法来有序地消费它们。
当前的kafka0.8版本除了在生产者端附加“消息密钥”之外,没有其他方法,在这里发现了一个类似的主题,建议在消息负载中对其进行编码。但我做了很多搜索,但找不到可能的方法。
另外,我不知道这种方法是否会对kafka的整体性能产生任何影响,因为它在内部管理消息偏移量,而且从本页可以看出,目前还没有公开这样的api
真的很感激任何线索,如果这在所有正确的方式,我的想法或如果有任何可能的方法,我都准备给它一个尝试

ecfdbz9o

ecfdbz9o1#

这看起来会帮助你实现你的目标。它允许您轻松地定义和编写消息头,从而隐藏(反)序列化负担。您唯一需要提供的是一个(反)序列化程序,用于通过连接发送的实际对象。这个实现实际上尽可能地延迟有效负载对象的反序列化过程,这意味着您可以(以一种非常高效和透明的方式)反序列化报头,检查时间戳,并且只有在确定对象对您有用时才反序列化有效负载(重位)。

58wvjzkj

58wvjzkj2#

如果您想在特定的时间段内使用消息,那么我可以为您提供一个解决方案,但是从该时间段以有序的方式使用消息是很困难的。我也在寻找同样的解决方案。检查下面的链接
kafka队列中的消息排序
获取特定时间数据的解决方案
对于时间t1,t2,…tn,其中t是时间范围;把主题分成n个分区。现在使用partitioner类生成消息的方式是,应该使用消息生成时间来决定该消息应该使用哪个分区。
类似地,在使用时,为您要使用的时间范围订阅确切的分区。

wqsoz72f

wqsoz72f3#

您可以创建一个包含分区信息和创建此消息时的时间戳的类,然后将其用作kafka消息的键。然后可以使用 Package 器serde将此类转换为字节数组并返回,因为kafka只能理解字节。然后,当您在消费端接收到一个字节包形式的消息时,您可以反序列化它并检索时间戳,然后将其传递到您的逻辑中。
例如:

public class KafkaKey implements Serializable {
    private long mTimeStampInSeconds;
    /* This contains other partitioning data that will be used by the
    appropriate partitioner in Kafka. */
    private PartitionData mPartitionData;

    public KafkaKey(long timeStamp, ...) {
        /* Initialize key */
        mTimeStampInSeconds = timestamp;
    }

    /* Simple getter for timestamp */
    public long getTimeStampInSeconds() {
        return mTimeStampInSeconds;
    }

    public static byte[] toBytes(KafkaKey kafkaKey) {
        /* Some serialization logic. */
    }

    public static byte[] toBytes(byte[] kafkaKey) throws Exception {
        /* Some deserialization logic. */
    }
}

/* Producer End */

KafkaKey kafkaKey = new KafkaKey(System.getCurrentTimeMillis(), ... );
KeyedMessage<byte[], byte[]> kafkaMessage = new KeyedMessage<>(topic, KafkaKey.toBytes(kafkaKey), KafkaValue.toBytes(kafkaValue));

/* Consumer End */
MessageAndMetadata<byte[],byte[]> receivedMessage = (get from consumer);
KafkaKey kafkaKey = KafkaKey.fromBytes(receivedMessage.key());

long timestamp = kafkaKey.getTimeStampInSeconds();
/*
 * And happily ever after */

这将比使特定分区对应于时间间隔更灵活。否则,您必须不断地为不同的时间范围添加分区,并保持一个单独的、同步的表格,显示哪个分区对应于哪个时间范围,这可能会很快变得不方便。

g2ieeal7

g2ieeal74#

注意,根据这一讨论,Kafka在消息的内部表示中引入了时间戳:https://cwiki.apache.org/confluence/display/kafka/kip-32+-+add+timestamps+to+kafka+message
还有这些票:https://issues.apache.org/jira/browse/kafka-2511
它应该可以在Kafka的所有版本 0.10.0.0 更大。
这里的问题是,您以不再需要的顺序接收消息。如果顺序很重要,那么您需要放弃相关生产者中的并行性。然后消费者层面的问题就消失了。

相关问题