文章23 | 阅读 13058 | 点赞0
public class SlidingWindowAll {
private static final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
private static final DataStreamSource<String> stream = env.socketTextStream("192.168.8.111", 8888);
public static void main(String[] args) throws Exception {
SingleOutputStreamOperator<Integer> mapped = stream.map((MapFunction<String, Integer>) Integer::valueOf).returns(Types.INT);
AllWindowedStream<Integer, TimeWindow> timeWindowAll = mapped.timeWindowAll(Time.seconds(5), Time.seconds(1));
SingleOutputStreamOperator<Integer> summed = timeWindowAll.sum(0);
summed.print();
env.execute("SlidingWindowAll");
}
}
mapped.timeWindowAll(Time.seconds(5), Time.seconds(1));
每隔一秒,计算出五秒内的数据。
public class SlidingWindow {
private static final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
private static final DataStreamSource<String> stream = env.socketTextStream("192.168.8.111", 8888);
public static void main(String[] args) throws Exception {
SingleOutputStreamOperator<Tuple2> mapped = stream.map((MapFunction<String, Tuple2>) item -> {
String[] data = item.split(" ");
return Tuple2.of(data[0], Integer.valueOf(data[1]));
}).returns(Types.TUPLE(Types.STRING, Types.INT));
KeyedStream<Tuple2, Tuple> keyed = mapped.keyBy(0);
WindowedStream<Tuple2, Tuple, TimeWindow> timeWindow = keyed.timeWindow(Time.seconds(5), Time.seconds(1));
SingleOutputStreamOperator<Tuple2> summed = timeWindow.sum(1);
summed.print();
env.execute("SlidingWindow");
}
}
这是分组的情况。
timeWindow方法,传递一个参数,是滚动窗口。传入两个窗口,是滑动窗口。
也可以用Window方法:
public class SlidingProcessingTimeWindowsTest {
private static final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
private static final DataStreamSource<String> stream = env.socketTextStream("192.168.8.111", 8888);
public static void main(String[] args) throws Exception {
SingleOutputStreamOperator<Tuple2> mapped = stream.map((MapFunction<String, Tuple2>) item -> {
String[] data = item.split(" ");
return Tuple2.of(data[0], Integer.valueOf(data[1]));
}).returns(Types.TUPLE(Types.STRING, Types.INT));
KeyedStream<Tuple2, Tuple> keyed = mapped.keyBy(0);
WindowedStream<Tuple2, Tuple, TimeWindow> timeWindow = keyed.window(SlidingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(1)));
SingleOutputStreamOperator<Tuple2> summed = timeWindow.sum(1);
summed.print();
env.execute("SlidingProcessingTimeWindowsTest");
}
}
内容来源于网络,如有侵权,请联系作者删除!