Flink算子Reduce和Max与Min

x33g5p2x  于2021-03-14 发布在 Flink  
字(2.2k)|赞(0)|评价(0)|浏览(535)

reduce:减少、降低、缩小、减低、削减、缩减、压缩、简化、裁减、精简、简约 ...

总之,reduce就是由多变少的意思。

public interface ReduceFunction<T> extends Function, Serializable {
	T reduce(T value1, T value2) throws Exception;
}

reduce方法,传入value1value2两个相同类型的值,传出去一个同类型的值。所以,输入的多,输出的少。就是reduce减少的意思。


public class WordCountStreamingJob {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> dataStreamSource = env.fromElements("Apollo", "Paul", "Tom", "Paul", "Apollo", "Tom", "Marry");
        SingleOutputStreamOperator<Tuple2> mapped = dataStreamSource.map((MapFunction<String, Tuple2>) item -> Tuple2.of(item, 1)).returns(Types.TUPLE(Types.STRING, Types.INT));
        KeyedStream<Tuple2, Tuple> keyed = mapped.keyBy(0);
        SingleOutputStreamOperator<Tuple2> returns = keyed.reduce((ReduceFunction<Tuple2>) (value1, value2) -> Tuple2.of(value1.f0, (Integer) value1.f1 + (Integer) value2.f1)).returns(Types.TUPLE(Types.STRING, Types.INT));
        returns.print();
        env.execute("WordCountStreamingJob");
    }
}

示例中,传入两个Tuple2<String,Integer>类型的元素,传出一个Tuple2<String,Integer>类型的元素。

其中,Tuple2.f0都一样,Tuple2.f1进行了相加,然后返回。


Max与Min,分组后聚合函数。

public class WordCountStreamingJob {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> dataStreamSource = env.fromElements("Apollo 1", "Paul 3", "Tom 2", "Paul 1", "Apollo 2", "Tom 4", "Marry 6");
        SingleOutputStreamOperator<Tuple2> mapped = dataStreamSource.map(new MapFunction<String, Tuple2>() {
            @Override
            public Tuple2 map(String item) throws Exception {
                return Tuple2.of(item.split(" ")[0], Integer.valueOf(item.split(" ")[1]));
            }
        }).returns(Types.TUPLE(Types.STRING, Types.INT));
        KeyedStream<Tuple2, Tuple> keyed = mapped.keyBy(0);
        SingleOutputStreamOperator<Tuple2> result = keyed.max(1);
        result.print();
        env.execute("WordCountStreamingJob");
    }
}

SingleOutputStreamOperator<Tuple2> result = keyed.max(1); 根据第二个字段求最大值。

上一篇:Flink算子KeyBy

相关文章