flink应用程序没有接收和处理关闭时从kinesis连接器生成的事件

i1icjdpr  于 2021-06-24  发布在  Flink
关注(0)|答案(1)|浏览(282)

问题:flink应用程序没有接收和处理kinesis连接器关闭时(由于重新启动)生成的事件
我们有下面的flink env设置

env.enableCheckpointing(1000ms);
env.setStateBackend(new RocksDBStateBackend("file:///<filelocation>", true));
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(pause); 
env.getCheckpointConfig().setCheckpointTimeout(timeOut); 
env.getCheckpointConfig().setMaxConcurrentCheckpoints(concurrency);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

动觉具有以下初始形态

kinesisConsumerConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION,
                "LATEST");

有趣的是,当我改变动觉配置来回复事件时,例如。

kinesisConsumerConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION,
                "TRIM_HORIZON");

flink正在从kinesis接收所有缓冲记录(包括flink应用程序关闭之前、期间和之后生成的事件),并对其进行处理。因此,这种行为违反了flink应用程序的“恰好一次”属性。
有人能指出我遗漏的一些明显的东西吗?

bogh5gae

bogh5gae1#

flink kinesis连接器确实将碎片序列号存储在状态中,以便只进行一次处理。
根据您的描述,在您的作业“restart”中,检查点状态似乎不受尊重。
首先要消除一个显而易见的问题:您的作业是如何从重启中恢复的?您是从保存点恢复,还是从以前的检查点自动重新启动?

相关问题