Flink按照EventTime作为标准

x33g5p2x  于2021-03-14 发布在 Flink  
字(1.2k)|赞(0)|评价(0)|浏览(527)

按照数据所携带的时间来划分窗口

public class EventTimeSessionWindowAll {

    private static final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    private static final DataStreamSource<String> stream = env.socketTextStream("192.168.8.111", 8888);

    public static void main(String[] args) throws Exception {
        /**
         * 设置EventTime作为标准
         */
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        /**
         * 从数据中提取时间字段作为EventTime,不会改变原有数据的样子。
         */
        SingleOutputStreamOperator<String> dataStream = stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(0)) {
            @Override
            public long extractTimestamp(String item) {
                // 1000,spark,1
                String[] data = item.split(",");
                return Long.parseLong(data[0]);
            }
        });
        SingleOutputStreamOperator<Tuple3> mapped = dataStream.map((MapFunction<String, Tuple3>) item -> {
            String[] data = item.split(",");
            return Tuple3.of(Long.parseLong(data[0]), data[1], Integer.valueOf(data[2]));
        }).returns(Types.TUPLE(Types.LONG, Types.STRING, Types.INT));
        AllWindowedStream<Tuple3, TimeWindow> eventTimeSessionWindowAll = mapped.windowAll(EventTimeSessionWindows.withGap(Time.seconds(5)));
        eventTimeSessionWindowAll.sum(2).print();
        env.execute("EventTimeSessionWindowAll");
    }
}
上一篇:Session窗口

相关文章