文章23 | 阅读 10349 | 点赞0
DataStreamSource<Long> streamSource = env.fromParallelCollection(new NumberSequenceIterator(1, 10), Long.class);
DataStreamSource<Long> streamSource = env.fromParallelCollection(new NumberSequenceIterator(1, 10), TypeInformation.of(Long.TYPE));
查看并行度
System.out.println(streamSource.getParallelism()); // 并行度是 12
可以设置并行度
streamSource.setParallelism(4);
1到100的序列
DataStreamSource<Long> streamSource = env.generateSequence(1, 100);
generateSequence得到的DataStreamSource的并行度是12。
以上的Source都是玩具,实际上不会使用的。
env.readTextFile("C:\\Users\\Administrator\\Desktop\\data");
readTextFile的并行度是12。
public class WordCountStreamingJob {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> streamSource = env.readTextFile("C:\\Users\\Administrator\\Desktop\\data");
System.out.println(streamSource.getParallelism());
SingleOutputStreamOperator<Tuple2> wordAndOne = streamSource.flatMap((FlatMapFunction<String, Tuple2>) (line, out) -> {
for (String word : line.split(" ")) {
out.collect(Tuple2.of(word, 1));
}
}).returns(Types.TUPLE(Types.STRING, Types.INT));
System.out.println(wordAndOne.getParallelism());
wordAndOne.keyBy(0).sum(1).print();
env.execute("WordCountStreamingJob");
}
}
综上所述:
readTextFile、fromParallelCollection、generateSequence它们的并行度是跟可用的逻辑核数是相关的。可以多并行的。
readTextFile也不是可以一直运行的。socketTextStream是可以一直运行的,但是并行度是1。
内容来源于网络,如有侵权,请联系作者删除!