Flink流处理可并行的Source

x33g5p2x  于2021-03-14 发布在 Flink  
字(1.4k)|赞(0)|评价(0)|浏览(328)
DataStreamSource<Long> streamSource = env.fromParallelCollection(new NumberSequenceIterator(1, 10), Long.class);
DataStreamSource<Long> streamSource = env.fromParallelCollection(new NumberSequenceIterator(1, 10), TypeInformation.of(Long.TYPE));

查看并行度

System.out.println(streamSource.getParallelism()); // 并行度是 12

可以设置并行度

streamSource.setParallelism(4);

1到100的序列

DataStreamSource<Long> streamSource = env.generateSequence(1, 100); 

generateSequence得到的DataStreamSource的并行度是12。

以上的Source都是玩具,实际上不会使用的。

env.readTextFile("C:\\Users\\Administrator\\Desktop\\data");

readTextFile的并行度是12。

public class WordCountStreamingJob {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> streamSource = env.readTextFile("C:\\Users\\Administrator\\Desktop\\data");
        System.out.println(streamSource.getParallelism());
        SingleOutputStreamOperator<Tuple2> wordAndOne = streamSource.flatMap((FlatMapFunction<String, Tuple2>) (line, out) -> {
            for (String word : line.split(" ")) {
                out.collect(Tuple2.of(word, 1));
            }
        }).returns(Types.TUPLE(Types.STRING, Types.INT));
        System.out.println(wordAndOne.getParallelism());
        wordAndOne.keyBy(0).sum(1).print();
        env.execute("WordCountStreamingJob");
    }
}

综上所述:

readTextFile、fromParallelCollection、generateSequence它们的并行度是跟可用的逻辑核数是相关的。可以多并行的。

readTextFile也不是可以一直运行的。socketTextStream是可以一直运行的,但是并行度是1。

相关文章