Flink算子(Map,FlatMap,Filter)

x33g5p2x  于2021-03-14 发布在 Flink  
字(2.0k)|赞(0)|评价(0)|浏览(580)

Map

DataStreamSource<T> 转换 SingleOutputStreamOperator<T>

public class WordCountStreamingJob {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<Integer> streamSource = env.fromCollection(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9));
        SingleOutputStreamOperator<Integer> result = streamSource.map(new MapFunction<Integer, Integer>() {
            @Override
            public Integer map(Integer item) throws Exception {
                return item * 1;
            }
        });
        result.print();
        env.execute("WordCountStreamingJob");
    }
}

比如,传入的是一个整型的集合,传出的是另一个整型的集合。一比一对应。

简化版:

public class WordCountStreamingJob {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<Integer> streamSource = env.fromCollection(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9));
        SingleOutputStreamOperator<Integer> result = streamSource.map((MapFunction<Integer, Integer>) item -> item * 1);
        result.print();
        env.execute("WordCountStreamingJob");
    }
}

FlatMap

public class WordCountStreamingJob {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<Integer> streamSource = env.fromCollection(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9));
        streamSource.flatMap(new FlatMapFunction<Integer, Integer>() {
            @Override
            public void flatMap(Integer item, Collector<Integer> out) throws Exception {
                // 复制十份
                for (int i = 0; i < 10; i++) {
                    out.collect(item);
                }
            }
        }).print();
        env.execute("WordCountStreamingJob");
    }
}

传入一个集合,把每个集合复制十份,再输出一个集合。

Filter

public class WordCountStreamingJob {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<Integer> streamSource = env.fromCollection(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9));
        streamSource.filter((FilterFunction<Integer>) item -> item > 5).print();
        env.execute("WordCountStreamingJob");
    }
}

返回一个布尔值,来过滤数据。

上一篇:kafka的Source
下一篇:Flink算子KeyBy

相关文章