如何将kstream到ktable中的记录对减少到1

5fjcxozz  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(365)

这是输入Kafka主题,其中包含 ConnectionEvent :
ConnectionEvent("John", "123", "CONNECTED") ConnectionEvent("John", "123", "DISCONNECTED") ConnectionEvent("Anna", "222", "CONNECTED") ConnectionEvent("Rohan", "334", "CONNECTED") ConnectionEvent("Anna", "199", "CONNECTED") ConnectionEvent("Anna", "255", "CONNECTED") ConnectionEvent("Anna", "255", "DISCONNECTED") ConnectionEvent("Anna", "222", "DISCONNECTED") 流与还原逻辑
主题中的每个项目都使用消息键作为用户id发送。例如,“anna”。
必须按以下方式处理流:
john只有1个会话123连接和断开。所以他下线了
rohan只有一个没有断开的会话334。所以他是在线的
anna有3个会话(222199255),其中2个断开连接。所以她上网了
ktable必须具有以下数据: John Offline Rohan Online Anna Online 我尝试的是:

KTable<String, String> connectedSessions = stream.groupBy((k,v) -> v.getSessionId()) //Group by user and then by sessionId
            .reduce((agg, newVal) -> agg)  //Take latest value ie, reduce pair of records for each session to 1
            .filter(x -> x.getState == CONNECTED)  //Filter only session records which has CONNECTED has last state

但是现在,如何将复合键(user,sessionid)解组为only user,然后根据最新状态为connected的sessionid的数量将user标记为online/offline?

balp4ylt

balp4ylt1#

如果用户在线,只要他连接的事件数大于断开的事件数。因此,您可以聚合流中的连接数并检查它是否为正。比如:

KTable<String, String> connectedSessions = stream.groupByKey()
        .aggregate(
            () -> 0,
            (k, v, numberOfConnections) -> v.getState == CONNECTED ? numberOfConnections++ : numberOfConnections--)
        .mapValues((k, numberOfConnections) -> numberOfConnections > 0 ? "Online" : "Offline");

相关问题