kafka流和窗口化,在一个时间窗口内保持计数

ttp71kqs  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(565)

我是stackoverflow的新手,如果这个问题问得不好,请原谅。任何帮助/灵感都将不胜感激!
我正在使用kafka流过滤传入数据库的数据。传入的消息看起来像 {"ID":"X","time":"HH:MM"} 还有一些其他的参数,在这种情况下是不相关的。我成功地运行了一个java应用程序,它读取一个主题并打印出传入的消息。现在我要做的是使用ktables(?)将具有相同id的传入消息分组,然后使用会话窗口将表分组到时间段中。我要在时间轴上连续运行x分钟的时间窗口。
当然,第一件事是运行一个ktable来统计具有相同id的传入消息

ID       Count
X          1
Y          3
Z          1

它会不断更新,因此带有过期时间戳的消息会从表中删除。
我不是百分之百确定,但我想我想要的是ktables而不是kstreams,对吗?如果这是达到我想要的结果的正确方法,我该如何实现滑动窗口呢?
这就是我现在使用的代码。它只读取主题并打印传入的消息。

private static List<String> printEvent(String o) {
    System.out.println(o);
    return Arrays.asList(o);
}

final StreamsBuilder builder = new StreamsBuilder();
    builder.<String, String>stream(srcTopic)
    .flatMapValues(value -> printEvent(value));

我想知道我要添加什么来实现上面所说的我想要的输出,以及我把它放在我的代码中的什么地方。
提前感谢您的帮助!

imzjd6km

imzjd6km1#

是的,您需要ktable和滑动窗口,我还建议您查看水印功能,以处理迟交邮件。例子

KTable<Windowed<Key>, Value> oneMinuteWindowed = yourKStream

.groupByKey()

.reduce(/*your adder*/, TimeWindows.of(60*1000, 60*1000), "store1m");
        //where your adder can be as simple as (val, agg) -> agg + val
        //for primitive types or as complex as you need

相关问题