有什么方法可以在kafka消息负载中添加时间戳头吗?我想检查消息是何时在使用者端创建的,并在此基础上应用自定义逻辑。
编辑:
我正试图找到一种方法,将一些自定义值(基本上是时间戳)附加到生产者发布的消息上,以便能够在特定的时间段内使用消息。现在,Kafka只确保消息将按照它们放入队列的顺序传递。但在我的例子中,先前生成的记录可能在某个延迟之后到达(因此,在时间t1生成的消息的偏移量1可能高于在稍后时间t2生成的偏移量为0的消息)。由于这个原因,他们将不会在顺序,我期望在消费者的结束。所以我基本上是在寻找一种方法来有序地消费它们。
当前的kafka0.8版本除了在生产者端附加“消息密钥”之外,没有其他方法,在这里发现了一个类似的主题,建议在消息负载中对其进行编码。但我做了很多搜索,但找不到可能的方法。
另外,我不知道这种方法是否会对kafka的整体性能产生任何影响,因为它在内部管理消息偏移量,而且从本页可以看出,目前还没有公开这样的api
真的很感激任何线索,如果这在所有正确的方式,我的想法或如果有任何可能的方法,我都准备给它一个尝试
4条答案
按热度按时间ecfdbz9o1#
这看起来会帮助你实现你的目标。它允许您轻松地定义和编写消息头,从而隐藏(反)序列化负担。您唯一需要提供的是一个(反)序列化程序,用于通过连接发送的实际对象。这个实现实际上尽可能地延迟有效负载对象的反序列化过程,这意味着您可以(以一种非常高效和透明的方式)反序列化报头,检查时间戳,并且只有在确定对象对您有用时才反序列化有效负载(重位)。
58wvjzkj2#
如果您想在特定的时间段内使用消息,那么我可以为您提供一个解决方案,但是从该时间段以有序的方式使用消息是很困难的。我也在寻找同样的解决方案。检查下面的链接
kafka队列中的消息排序
获取特定时间数据的解决方案
对于时间t1,t2,…tn,其中t是时间范围;把主题分成n个分区。现在使用partitioner类生成消息的方式是,应该使用消息生成时间来决定该消息应该使用哪个分区。
类似地,在使用时,为您要使用的时间范围订阅确切的分区。
wqsoz72f3#
您可以创建一个包含分区信息和创建此消息时的时间戳的类,然后将其用作kafka消息的键。然后可以使用 Package 器serde将此类转换为字节数组并返回,因为kafka只能理解字节。然后,当您在消费端接收到一个字节包形式的消息时,您可以反序列化它并检索时间戳,然后将其传递到您的逻辑中。
例如:
这将比使特定分区对应于时间间隔更灵活。否则,您必须不断地为不同的时间范围添加分区,并保持一个单独的、同步的表格,显示哪个分区对应于哪个时间范围,这可能会很快变得不方便。
g2ieeal74#
注意,根据这一讨论,Kafka在消息的内部表示中引入了时间戳:https://cwiki.apache.org/confluence/display/kafka/kip-32+-+add+timestamps+to+kafka+message
还有这些票:https://issues.apache.org/jira/browse/kafka-2511
它应该可以在Kafka的所有版本
0.10.0.0
更大。这里的问题是,您以不再需要的顺序接收消息。如果顺序很重要,那么您需要放弃相关生产者中的并行性。然后消费者层面的问题就消失了。