我是stackoverflow的新手,如果这个问题问得不好,请原谅。任何帮助/灵感都将不胜感激!
我正在使用kafka流过滤传入数据库的数据。传入的消息看起来像 {"ID":"X","time":"HH:MM"}
还有一些其他的参数,在这种情况下是不相关的。我成功地运行了一个java应用程序,它读取一个主题并打印出传入的消息。现在我要做的是使用ktables(?)将具有相同id的传入消息分组,然后使用会话窗口将表分组到时间段中。我要在时间轴上连续运行x分钟的时间窗口。
当然,第一件事是运行一个ktable来统计具有相同id的传入消息
ID Count
X 1
Y 3
Z 1
它会不断更新,因此带有过期时间戳的消息会从表中删除。
我不是百分之百确定,但我想我想要的是ktables而不是kstreams,对吗?如果这是达到我想要的结果的正确方法,我该如何实现滑动窗口呢?
这就是我现在使用的代码。它只读取主题并打印传入的消息。
private static List<String> printEvent(String o) {
System.out.println(o);
return Arrays.asList(o);
}
final StreamsBuilder builder = new StreamsBuilder();
builder.<String, String>stream(srcTopic)
.flatMapValues(value -> printEvent(value));
我想知道我要添加什么来实现上面所说的我想要的输出,以及我把它放在我的代码中的什么地方。
提前感谢您的帮助!
1条答案
按热度按时间imzjd6km1#
是的,您需要ktable和滑动窗口,我还建议您查看水印功能,以处理迟交邮件。例子