apache flink-事件时间

wi3ka0sx  于 2021-06-21  发布在  Flink
关注(0)|答案(2)|浏览(431)

我想为apache flink中的事件创建一个事件时钟。我是这样做的

public class TimeStampAssigner implements AssignerWithPeriodicWatermarks<Tuple2<String, String>> {

    private final long maxOutOfOrderness = 0; // 3.5 

    private long currentMaxTimestamp;

    @Override
    public long extractTimestamp(Tuple2<String, String> element, long previousElementTimestamp) {

        currentMaxTimestamp = new  Date().getTime();

        return currentMaxTimestamp;
    }

    @Override
    public Watermark getCurrentWatermark() {

        return new Watermark(currentMaxTimestamp - maxOutOfOrderness);

    }

}

请检查上面的代码,告诉我是否做得正确。在事件时间和水印分配之后,我想处理stream-in-process函数,在该函数中,我将为不同的密钥收集流数据10分钟。

v2g6jxz6

v2g6jxz61#

不,这不是一个合适的实现。事件时间戳应该是确定的(即,可再现的),并且应该基于事件流中的数据。相反,如果要使用date().gettime,则或多或少要使用处理时间。
通常,在执行事件时间处理时,事件将有一个时间戳字段,时间戳提取器将返回该字段的值。
您展示的实现将失去处理事件时间的大部分好处,例如重新处理历史数据以重现历史结果的能力。

nwlqm0z1

nwlqm0z12#

您的实现是实现对flink系统的摄取时间,而不是事件时间。例如,如果您从Kafka消费,则previouselementtimestamp通常应指向事件已生成到Kafka的时间(如果Kafka制作者未说明其他内容),这将使您的流式处理具有可复制性。
如果您想在flink中实现事件时间处理,您应该使用一些与您的元素相关联的时间戳。它可以是或在元素本身内部(这对于时间序列是有意义的),也可以存储在kafka中,并且在前面的elementtimestamp下可用。
关于maxoutoforderness,您可能还想考虑flink的side output特性,它可以在创建窗口之后获取最新的元素并更新flink作业的输出。
如果您使用kafka,并且只需要简单的数据丢失事件时间处理实现,请使用ascendingtimestampextractor。ascendingtimestampextractor存在一些潜在问题,如果您的数据未在分区内排序,或者您在操作符之后而不是直接在kafkasource之后应用此提取器,则可能会出现这些问题。对于健壮的工业用例,您应该像googledataflow模型中提到的那样,将水印摄取实现到持久性日志存储中。

相关问题