使用flink计算流中有状态实体的最新状态

20jt8wwn  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(304)

我试图在Flink创建我的第一个实时分析工作。这种方法类似于kappa架构,所以我有关于kafka的原始数据,在那里我们接收到任何实体状态的每一个变化的消息。
因此信息的形式如下:

(id,newStatus, timestamp)

我们要计算,对于每个时间窗口,给定状态下的项目数。所以输出的形式应该是:

(outputTimestamp, state1:count1,state2:count2 ...)

或同等产品。这些行在任何给定时间都应包含给定状态下的项计数,其中与id关联的状态是为该id观察到的最新消息。在任何情况下,都应计算id的状态,即使事件比正在处理的事件早得多。因此,所有计数的总和应该等于系统中观察到的不同id的数目。接下来的步骤可能会在一段时间后忘记最后一个项目中的项目,但现在这不是一个严格的要求。
这将写在elasticsearch上,然后进行查询。
我尝试了许多不同的途径,但没有一条完全符合要求。使用滑动窗口,我可以很容易地实现预期的行为,只是当滑动窗口的开始超过事件的时间戳时,它会丢失计数,正如您所料。其他方法在处理积压工作时无法保持一致,因为我对密钥和时间戳做了一些技巧,但在一次处理完数据时失败了。
所以我想知道,即使是在高水平上,我应该如何处理这个问题。它看起来像是一个相对常见的用例,但是必须无限期地保留给定id的相关信息,以便正确地计算实体,这一事实会产生很多问题。

umuewwlo

umuewwlo1#

我想我有办法解决你的问题:
给予 DataStream(id, state, time) 作为:

val stateUpdates: DataStream[(Long, Int, ts)] = ???

实际状态更改如下所示:

val stateCntUpdates: DataStream[(Int, Int)] = s // (state, cntUpdate)
  .keyBy(_._1) // key by id
  .flatMap(new StateUpdater)
``` `StateUpdater` 是有状态的 `FlatMapFunction` . 它有一个键控状态,存储每个id的最后一个状态。对于每个输入记录,它返回两个状态计数更新记录: `(oldState, -1)` ,  `(newState, +1)` . 这个 `(oldState, -1)` 记录确保减少以前状态的计数。
接下来,根据每个状态和窗口聚合状态计数更改:

val cntUpdatesPerWindow: DataStream[(Int, Int, Long)] = stateCntUpdates // (state, cntUpdate, time)
.keyBy(_._1) // key by state
.timeWindow(Time.minutes(10)) // window should be non-overlapping, e.g. Tumbling
.apply(new SumReducer(), new YourWindowFunction())
``` SumReducer 对CNT更新和 YourWindowFunction 指定窗口的时间戳。此步骤聚合窗口中每个状态的所有状态更改。
最后,我们使用计数更新来调整当前计数。

val stateCnts: DataStream[(Int, Int, Long)] = cntUpdatesPerWindow // (state, count, time)
  .keyBy(_._1) // key by state again
  .map(new CountUpdater)
``` `CountUpdater` 是有状态的 `MapFunction` . 它有一个键控状态,存储每个状态的当前计数。对于每个传入的记录,将调整计数并创建一个记录 `(state, newCount, time)` 发射。
现在您有了一个流,每个状态都有新的计数(每个状态一个记录)。如果可能,您可以使用这些记录更新elasticsearch索引。如果需要收集给定时间内的所有状态计数,可以使用窗口来完成。
请注意:此程序的状态大小取决于唯一ID的数量。如果id空间增长非常快,可能会导致问题。

相关问题