flink-cep模式一起工作?

wkyowqbh  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(425)

我读了apache-flink关于州生存时间的文档https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#state-生存时间ttl
我不明白两个时刻。
1)

StateTTLConfig ttl = StateTtlConfig
.newBuilder(Time.minutes(60))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();

//And use in my Process Function 

valueStateDescriptor.enableTimeToLive(ttl);

如果我在15:00输入valuestate某个元素,然后用savepoint停止我的工作,只有在17:00我才会从最后一个savepoint开始我的工作。
价值状态会很清楚,对吗?
2) 如果我使用apache flink cep模式:

.begin("a")
.where(simpleConditionA)
.followedBy("b")
.where(simpleConditionB)
.within(Time.minutes(60));

如果我在15:00得到一个元素,然后用savepoint停止我的工作,只有在17:00我才会从最后一个savepoint开始我的工作。得到b元素,模式不匹配,对吗?
它(ttl)如何与apache-flink-cep模式一起工作?
谢谢。
我了解cep,我真的在利用摄取时间。我将试着解释:我用valuestate、timertime和ontimer方法中的clear state来使用process函数。我放入状态(keyedstate)中的一些元素,将计时器设置为1小时,并执行一些逻辑。基本上,值状态+定时器用作输出限制器(1小时内输出1条消息)。在我的公司中,我们需要停止在集群上运行作业(使用保存点),然后在几个小时后,我们需要从最后一个保存点重新启动作业。现在我不使用ttl,重启后,我的valuestate.value不能为null。我希望重启后在不到一个小时的valuestate.value不为null(如果我在stop之前输入state),但超过一个小时的value状态总是为null。
p、 我使用rrocksdb状态后端,增量检查点间隔为1s。它工作得很好。))

uoifb46i

uoifb46i1#

如果我在15:00输入valuestate某个元素,然后用savepoint停止我的工作,只有在17:00我才会从最后一个savepoint开始我的工作。价值状态会很清楚,对吗?
(1) 这种价值状态实际上会消失,但我不确定它是否真的会消失。如果您的状态ttl配置包括 cleanupFullSnapshot() 如果您在16:00之后获取保存点,则可以保证保存点不会包含有问题的状态。但在本例中,这两种情况似乎都不是真的,因此状态在快照中。我不知道是在快照还原期间还是在下次清理期间删除了已过期的状态。但既然你已经指定了 NeverReturnExpired ,它不会影响结果。
它(ttl)如何与apache flink cep模式一起工作?
(2) cep不使用状态ttl。只要可能影响模式匹配,cep就会保持state,一旦不再需要,就显式清除state。从您对这个问题的措辞来看,我假设您使用的是处理时间,而不是事件时间。在这种情况下,模式将不会在60分钟内匹配。但是,如果要使用事件时间,则将使用水印来确定经过了多少时间,并且停机时间对模式匹配没有影响。
更新:
我现在看到你正在使用摄取时间,并依赖计时器来清除状态。对于摄取时间,您可以选择使用事件时间或处理时间计时器。如果使用处理时间计时器,则任何本应在作业未运行时触发的计时器将在作业重新启动后立即触发。有了事件时间计时器,一旦水印到达计时器中的时间,它们就会开火。由于水印不保存在保存点中,因此在创建任何水印之前,必须对某些事件进行流动和处理(对于周期性水印,自动水印间隔必须经过)。

相关问题