Flink流处理非并行的Source

x33g5p2x  于2021-03-14 发布在 Flink  
字(1.6k)|赞(0)|评价(0)|浏览(538)
StreamExecutionEnvironment.getExecutionEnvironment();

根据环境判断是本地环境还是集群环境,来创建运行环境。

DataStream<String> lines = env.socketTextStream("192.168.8.111", 8888);

DataStreamSource是DataStream的实现类。

DataStream是抽象的数据集,不实际装数据,只是数据集的描述。

通过转换方法可以被转换成其他的DataStream。

DataStreamSource<String> lines = env.fromElements();

fromElements方法,通常用来做实验的。(这只是一个玩具 ^_^ )

DataStreamSource<Integer> streamSource = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8);

同理,fromCollection和fromElements方法类似,只不过它是个集合。

DataStreamSource<Integer> streamSource = env.fromCollection(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8));

获取并行度 [getParallelism方法]

streamSource.getParallelism()
public class WordCountStreamingJob {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<Integer> streamSource = env.fromCollection(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14));
        /**
         * fromCollection返回的DataStreamSource并行度为1
         */
        System.out.println(streamSource.getParallelism());
        SingleOutputStreamOperator<Integer> filtered = streamSource.filter(new FilterFunction<Integer>() {
            @Override
            public boolean filter(Integer value) throws Exception {
                return value % 2 == 0;
            }
        });
        /**
         * filter返回的DataStreamSource并行度为12
         */
        System.out.println(filtered.getParallelism());
        filtered.print();
        env.execute("WordCountStreamingJob");
    }
}

并行度在程序执行前,程序已经知道了,以为它只是一个描述信息,已经知道了又几个并行。

env.socketTextStream("192.168.8.111", 8888); // 并行度也为1

综上所述:

socketTextStream、fromElements、fromCollection返回DataStream的并行度默认均为1。

可以通过<u>setParallelism</u>方法进行设置并行度。

filtered.setParallelism(6);

相关文章