ApacheKafka根据其值对窗口消息进行排序

aydmsdu9  于 2021-06-08  发布在  Kafka
关注(0)|答案(2)|浏览(355)

我正在试图找到一种方法来重新排序主题分区中的消息,并将已排序的消息发送到新的主题。
我有一个kafka发行者,它发送以下格式的字符串消息: {system_timestamp}-{event_name}?{parameters} 例如:

1494002667893-client.message?chatName=1c&messageBody=hello
1494002656558-chat.started?chatName=1c&chatPatricipants=3

另外,我们为每条消息添加一些消息键,将它们发送到相应的分区。
我想做的是根据消息的{system timestamp}部分在1分钟的窗口内对事件重新排序,因为我们的发布者不保证消息将按照{system timestamp}值发送。
例如,我们可以先向主题传递一条{system timestamp}值更大的消息。
我研究了kafka流api,发现了一些有关消息窗口和聚合的示例:

Properties streamsConfiguration = new Properties();
        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-sorter");
        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
        streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

 KStreamBuilder builder = new KStreamBuilder();
 KStream<String, String> stream = builder.stream("events");
 KGroupedStream<String>, String> groupedStream = stream.groupByKey();//grouped events within partion.

    /* commented since I think that I don't need any aggregation, but I guess without aggregation I can't use time windowing.
KTable<Windowed<String>, String> windowedEvents = stream.groupByKey().aggregate(
                () -> "",  // initial value
                (aggKey, value, aggregate) -> aggregate + "",   // aggregating value
                TimeWindows.of(1000), // intervals in milliseconds
                Serdes.String(), // serde for aggregated value
                "test-store"
        );*/

但是我接下来该怎么处理这个分组流呢?我没有看到任何可用的'sort()(e1,e2)->e1.compareto(e2)'方法,windows也可以应用于aggregation()、reduce()、count()等方法,但我认为我不需要任何消息数据操作。
如何在1分钟窗口中重新排序邮件并将其发送到其他主题?

o75abkj4

o75abkj41#

下面是我如何在我的项目中排序流。
已创建具有源、处理器、接收器的拓扑。
处理器内
进程(键,值)->将每条记录添加到列表(示例变量)。
init()->schedule(window\u buffer\u time,wall\u clock\u time)->标点符号(timestamp)在list(instance variable)中对window buffer time的项目列表进行排序,并进行迭代和转发。清除列表(示例变量)。
这个逻辑对我很有效。

bgtovc5b

bgtovc5b2#

下面是一个提纲:
创建一个处理器实现:
in process()方法,对于每条消息:
从消息值读取时间戳
将(timestamp,message key)对作为键,将消息值作为值插入keyvaluestore。注意:这还提供了重复数据消除。您需要提供一个定制的serde来序列化密钥,以便时间戳排在第一位(按字节排列),以便范围内的查询首先按时间戳排序。
在标点()方法中:
使用从0到时间戳-60'000(=1分钟)的范围内的获取读取存储
使用context.forward()按顺序发送提取的消息,并从存储中删除它们
这种方法的问题是,如果没有新的msg到达以提前“流时间”,那么就不会触发sparture()。如果这种情况存在风险,您可以创建一个外部计划程序,向每个()发送周期性的“勾选”消息你的主题的分区,你的处理器应该忽略它,但是它们会导致标点符号在没有“真正的”msg时触发。kip-138将通过添加对系统时间标点的明确支持来解决此限制:https://cwiki.apache.org/confluence/display/kafka/kip-138%3a+change+punctuate+semantics

相关问题