【14】Flink 之 Window(窗口)

x33g5p2x  于2021-12-25 转载在 其他  
字(5.0k)|赞(0)|评价(0)|浏览(395)

1.1、Window(窗口)

  • 聚合事件(比如计数、求和)在流上的工作方式与批处理不同。

  • 比如,对流中的所有元素进行计数是不可能的,因为通常流是无限的(无界的)。所以,流上的聚合需要由 window 来划定范围,比如 “计算过去的5分钟” ,或者 “最后100个元素的和” 。

  • window是一种可以把无限数据切割为有限数据块的手段

  • 窗口可以是 时间驱动的 【Time Window】(比如:每30秒)或者 数据驱动的【Count Window】 (比如:每100个元素)。

1.2、Window 的类型

  • 窗口通常被区分为不同的类型:
  1. tumbling windows:滚动窗口 【没有重叠】
  2. sliding windows:滑动窗口 【有重叠】
  3. session windows:会话窗口

1.2.1、tumbling windows(滚动窗口)

  • tumbling windows:滚动窗口 【没有重叠】

1.2.2、sliding windows(滑动窗口)

  • sliding windows:滑动窗口 【有重叠】

1.2.2、window 类型汇总

即 Time Window 和 Count Window 都可以实现 滚动窗口 或者 滑动窗口。

1.3 TimeWindow 的应用

官网代码实例如下:

1.4 CountWindow 的应用

1.5 自定义 Window 的应用

  1. 基于分组(应用代码及底层实现)

  1. 没有分组

2、Window 聚合分类

  • 增量聚合
  • 全量聚合

2.1、增量聚合

  • 窗口中每进入一条数据,就进行一次计算
reduce(reduceFunction)
aggregate(aggregateFunction)
sum(),min(),max()

官网reduce(reduceFunction)例子:

官网aggregate(aggregateFunction)例子:

2.2、全量聚合

  • 全量聚合
    等属于窗口的数据到齐,才开始进行聚合计算【可以实现对窗口内的数据进行排序等需求】
apply(windowFunction)
process(processWindowFunction)
  • processWindowFunction比windowFunction提供了更多的上下文信息

全量聚合状态变化过程-求最大值

官网apply(windowFunction)例子:

官网process(processWindowFunction)例子:

3、Window代码实践

3.1、增量聚合reduce代码实现

package com.Streaming;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;

/**
 * @Author: Henry
 * @Description: 增量聚合
 * @Date: Create in 2019/6/2 15:14
 **/
public class SocketDemoIncrAgg {

    public static void main(String[] args) throws Exception{
        //获取需要的端口号
        int port;
        try {
            ParameterTool parameterTool = ParameterTool.fromArgs(args);
            port = parameterTool.getInt("port");
        }catch (Exception e){
            System.err.println("No port set. use default port 9003--java");
            port = 9003;
        }

        //获取flink的运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        String hostname = "master";
        String delimiter = "\n";
        //连接socket获取输入的数据
        DataStreamSource<String> text = env.socketTextStream(hostname, port, delimiter);

        DataStream<Tuple2<Integer,Integer>> intData = text.map(
                new MapFunction<String, Tuple2<Integer,Integer>>() {
            @Override
            public Tuple2<Integer,Integer> map(String value) throws Exception {
                return new Tuple2<>(1,Integer.parseInt(value));
            }
        });

        intData.keyBy(0)
                .timeWindow(Time.seconds(5))
                .reduce(new ReduceFunction<Tuple2<Integer, Integer>>() {
                    @Override
                    public Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> value1,
                                                           Tuple2<Integer, Integer> value2)
                            throws Exception {
                        System.out.println("执行reduce操作:"+value1+","+value2);
                        return new Tuple2<>(value1.f0,value1.f1+value2.f1);
                    }
                }).print();

        //这一行代码一定要实现,否则程序不执行
        env.execute("Socket window count");
    }
}

3.2、增量聚合reduce运行结果

每过来一条数据进行一次计算:

3.3、全量聚合process代码实现

在这里插入代码片
package com.Streaming;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
/**
 * @Author: Henry
 * @Description: window 增量聚合
 * @Date: Create in 2019/6/2 16:11
 **/
public class SocketDemoFullCount {

    public static void main(String[] args) throws Exception{
        //获取需要的端口号
        int port;
        try {
            ParameterTool parameterTool = ParameterTool.fromArgs(args);
            port = parameterTool.getInt("port");
        }catch (Exception e){
            System.err.println("No port set. use default port 9003--java");
            port = 9003;
        }

        //获取flink的运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        String hostname = "master";
        String delimiter = "\n";

        //连接socket获取输入的数据
        DataStreamSource<String> text = env.socketTextStream(hostname, port, delimiter);

        DataStream<Tuple2<Integer,Integer>> intData = text.map(
                new MapFunction<String, Tuple2<Integer,Integer>>() {
            @Override
            public Tuple2<Integer,Integer> map(String value) throws Exception {
                return new Tuple2<>(1,Integer.parseInt(value));
            }
        });

        intData.keyBy(0)
                .timeWindow(Time.seconds(5))
                .process(
                        new ProcessWindowFunction<Tuple2<Integer,Integer>, String, Tuple, TimeWindow>() {
                    @Override
                    public void process(Tuple key, Context context,
                                        Iterable<Tuple2<Integer, Integer>> elements,
                                        Collector<String> out)
                            throws Exception {
                        System.out.println("执行process。。。");
                        long count = 0;
                        for(Tuple2<Integer,Integer> element: elements){
                            count++;
                        }
                        out.collect("window:"+context.window()+",count:"+count);
                    }
                }).print();

        //这一行代码一定要实现,否则程序不执行
        env.execute("Socket window count");
    }
}

3.4、全量聚合process运行结果

统计接收到的数据数量:

相关文章