文章23 | 阅读 12131 | 点赞0
StreamExecutionEnvironment.getExecutionEnvironment();
根据环境判断是本地环境还是集群环境,来创建运行环境。
DataStream<String> lines = env.socketTextStream("192.168.8.111", 8888);
DataStreamSource是DataStream的实现类。
DataStream是抽象的数据集,不实际装数据,只是数据集的描述。
通过转换方法可以被转换成其他的DataStream。
DataStreamSource<String> lines = env.fromElements();
fromElements方法,通常用来做实验的。(这只是一个玩具 ^_^ )
DataStreamSource<Integer> streamSource = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8);
同理,fromCollection和fromElements方法类似,只不过它是个集合。
DataStreamSource<Integer> streamSource = env.fromCollection(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8));
获取并行度 [getParallelism方法]
streamSource.getParallelism()
public class WordCountStreamingJob {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Integer> streamSource = env.fromCollection(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14));
/**
* fromCollection返回的DataStreamSource并行度为1
*/
System.out.println(streamSource.getParallelism());
SingleOutputStreamOperator<Integer> filtered = streamSource.filter(new FilterFunction<Integer>() {
@Override
public boolean filter(Integer value) throws Exception {
return value % 2 == 0;
}
});
/**
* filter返回的DataStreamSource并行度为12
*/
System.out.println(filtered.getParallelism());
filtered.print();
env.execute("WordCountStreamingJob");
}
}
并行度在程序执行前,程序已经知道了,以为它只是一个描述信息,已经知道了又几个并行。
env.socketTextStream("192.168.8.111", 8888); // 并行度也为1
综上所述:
socketTextStream、fromElements、fromCollection返回DataStream的并行度默认均为1。
可以通过<u>setParallelism</u>方法进行设置并行度。
filtered.setParallelism(6);
内容来源于网络,如有侵权,请联系作者删除!