flink中的windowall操作符是否将并行化缩小到1?

icomxhvb  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(508)

我在flink中有一个流,它从一个源发送多维数据集,对多维数据集进行转换(向多维数据集中的每个元素添加1),然后最后将其发送到下游以打印每秒的吞吐量。
流在4个线程上并行化。
如果我理解正确 windowAll 运算符是非并行变换,因此应该将并行化缩小到1,并将其与 TumblingProcessingTimeWindows.of(Time.seconds(1)) ,将最近一秒内所有并行子任务的吞吐量相加并打印。我不确定是否得到正确的输出,因为每秒的吞吐量是这样打印的:

1> 25
2> 226
3> 354
4> 372
1> 382
2> 403
3> 363
...

问题:流打印机是打印每个线程(1、2、3和4)的吞吐量,还是仅选择线程3打印所有子任务的吞吐量总和?
当我在开始时将环境的并行度设置为1时 env.setParallelism(1) ,我没有得到吞吐量之前的“x>”,但我似乎得到了与设置为4时相同(甚至更好)的吞吐量。这样地:

45
429
499
505
1
503
524
530
...

以下是程序的代码片段:

imports...

public class StreamingCase {
    public static void main(String[] args) throws Exception {
        int parallelism = 4;

        final StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        env.setParallelism(parallelism);

        DataStream<Cube> start = env
                .addSource(new CubeSource());

        DataStream<Cube> adder = start
                .map(new MapFunction<Cube, Cube>() {
                    @Override
                    public Cube map(Cube cube) throws Exception {
                        return cube.cubeAdd(1);
                    }
                });

        DataStream<Integer> throughput = ((SingleOutputStreamOperator<Cube>) adder)
                .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(1)))
                .apply(new AllWindowFunction<Cube, Integer, TimeWindow>() {
                    @Override
                    public void apply(TimeWindow tw,
                                      Iterable<Cube> values,
                                      Collector<Integer> out) throws Exception {
                        int sum = 0;
                        for (Cube c : values)
                            sum++;
                        out.collect(sum);
                    }
                });
        throughput.print();
        env.execute("Cube Stream of Sweetness");
    }
}
a5g8bdjr

a5g8bdjr1#

如果环境的并行度设置为3,并且您使用的是windowall操作符,则只有window操作符以并行度1运行。Flume仍将以平行度3运行。因此,计划如下:

In_1 -\               /- Out_1
In_2 --- WindowAll_1 --- Out_2
In_3 -/               \- Out_3

windowall操作符使用循环策略将其输出发送到后续任务。这就是不同线程发出程序结果记录的原因。
当您将环境并行度设置为1时,所有操作符都与单个任务一起运行。

相关问题