Flink滚动窗口

x33g5p2x  于2021-03-14 发布在 Flink  
字(2.3k)|赞(0)|评价(0)|浏览(573)
public class TumblingWindowAll {

    private static final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    private static final DataStreamSource<String> stream = env.socketTextStream("192.168.8.111", 8888);

    public static void main(String[] args) throws Exception {
        SingleOutputStreamOperator<Integer> mapped = stream.map((MapFunction<String, Integer>) Integer::valueOf).returns(Types.INT);
        AllWindowedStream<Integer, TimeWindow> timeWindowAll = mapped.timeWindowAll(Time.seconds(5));
        SingleOutputStreamOperator<Integer> summed = timeWindowAll.sum(0);
        summed.print();
        env.execute("TumblingWindowAll");
    }
}

窗口大小是5秒,时间间隔是5秒。换句话说,就是每隔五秒,统计五秒内的结果。

public class TumblingWindow {

    private static final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    private static final DataStreamSource<String> stream = env.socketTextStream("192.168.8.111", 8888);

    public static void main(String[] args) throws Exception {
        SingleOutputStreamOperator<Tuple2> mapped = stream.map((MapFunction<String, Tuple2>) item -> {
            String[] data = item.split(" ");
            return Tuple2.of(data[0], Integer.valueOf(data[1]));
        }).returns(Types.TUPLE(Types.STRING, Types.INT));
        KeyedStream<Tuple2, Tuple> keyed = mapped.keyBy(0);
        WindowedStream<Tuple2, Tuple, TimeWindow> timeWindow = keyed.timeWindow(Time.seconds(5));
        SingleOutputStreamOperator<Tuple2> summed = timeWindow.sum(1);
        summed.print();
        env.execute("TumblingWindow");
    }
}

分组后,每隔五秒,统计一次各个分组的情况。若某些分组数据无变化,则不打印无变化的分组。


public class TumblingProcessingTimeWindowsTest {

    private static final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    private static final DataStreamSource<String> stream = env.socketTextStream("192.168.8.111", 8888);

    public static void main(String[] args) throws Exception {
        SingleOutputStreamOperator<Tuple2> mapped = stream.map((MapFunction<String, Tuple2>) item -> {
            String[] data = item.split(" ");
            return Tuple2.of(data[0], Integer.valueOf(data[1]));
        }).returns(Types.TUPLE(Types.STRING, Types.INT));
        KeyedStream<Tuple2, Tuple> keyed = mapped.keyBy(0);
        WindowedStream<Tuple2, Tuple, TimeWindow> timeWindow = keyed.window(TumblingProcessingTimeWindows.of(Time.seconds(5)));
        SingleOutputStreamOperator<Tuple2> summed = timeWindow.sum(1);
        summed.print();
        env.execute("TumblingWindow");
    }
}
上一篇:计数窗口
下一篇:滑动窗口

相关文章