Flink滑动窗口

x33g5p2x  于2021-03-14 发布在 Flink  
字(2.5k)|赞(0)|评价(0)|浏览(497)
public class SlidingWindowAll {

    private static final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    private static final DataStreamSource<String> stream = env.socketTextStream("192.168.8.111", 8888);

    public static void main(String[] args) throws Exception {
        SingleOutputStreamOperator<Integer> mapped = stream.map((MapFunction<String, Integer>) Integer::valueOf).returns(Types.INT);
        AllWindowedStream<Integer, TimeWindow> timeWindowAll = mapped.timeWindowAll(Time.seconds(5), Time.seconds(1));
        SingleOutputStreamOperator<Integer> summed = timeWindowAll.sum(0);
        summed.print();
        env.execute("SlidingWindowAll");
    }
}

mapped.timeWindowAll(Time.seconds(5), Time.seconds(1)); 每隔一秒,计算出五秒内的数据。

public class SlidingWindow {

    private static final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    private static final DataStreamSource<String> stream = env.socketTextStream("192.168.8.111", 8888);

    public static void main(String[] args) throws Exception {
        SingleOutputStreamOperator<Tuple2> mapped = stream.map((MapFunction<String, Tuple2>) item -> {
            String[] data = item.split(" ");
            return Tuple2.of(data[0], Integer.valueOf(data[1]));
        }).returns(Types.TUPLE(Types.STRING, Types.INT));
        KeyedStream<Tuple2, Tuple> keyed = mapped.keyBy(0);
        WindowedStream<Tuple2, Tuple, TimeWindow> timeWindow = keyed.timeWindow(Time.seconds(5), Time.seconds(1));
        SingleOutputStreamOperator<Tuple2> summed = timeWindow.sum(1);
        summed.print();
        env.execute("SlidingWindow");
    }
}

这是分组的情况。

timeWindow方法,传递一个参数,是滚动窗口。传入两个窗口,是滑动窗口。

也可以用Window方法:

public class SlidingProcessingTimeWindowsTest {

    private static final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    private static final DataStreamSource<String> stream = env.socketTextStream("192.168.8.111", 8888);

    public static void main(String[] args) throws Exception {
        SingleOutputStreamOperator<Tuple2> mapped = stream.map((MapFunction<String, Tuple2>) item -> {
            String[] data = item.split(" ");
            return Tuple2.of(data[0], Integer.valueOf(data[1]));
        }).returns(Types.TUPLE(Types.STRING, Types.INT));
        KeyedStream<Tuple2, Tuple> keyed = mapped.keyBy(0);
        WindowedStream<Tuple2, Tuple, TimeWindow> timeWindow = keyed.window(SlidingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(1)));
        SingleOutputStreamOperator<Tuple2> summed = timeWindow.sum(1);
        summed.print();
        env.execute("SlidingProcessingTimeWindowsTest");
    }
}
上一篇:滚动窗口
下一篇:Session窗口

相关文章