org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator.setParallelism()方法的使用及代码示例

x33g5p2x  于2022-01-30 转载在 其他  
字(8.5k)|赞(0)|评价(0)|浏览(139)

本文整理了Java中org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator.setParallelism()方法的一些代码示例,展示了SingleOutputStreamOperator.setParallelism()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。SingleOutputStreamOperator.setParallelism()方法的具体详情如下:
包路径:org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
类名称:SingleOutputStreamOperator
方法名:setParallelism

SingleOutputStreamOperator.setParallelism介绍

[英]Sets the parallelism for this operator.
[中]设置此运算符的并行度。

代码示例

代码示例来源:origin: apache/flink

@Override
  public DataStreamSource<T> setParallelism(int parallelism) {
    if (parallelism != 1 && !isParallel) {
      throw new IllegalArgumentException("Source: " + transformation.getId() + " is not a parallel source");
    } else {
      super.setParallelism(parallelism);
      return this;
    }
  }
}

代码示例来源:origin: apache/flink

public static SingleOutputStreamOperator<Integer> createStatelessMap(DataStream<Integer> input) {
  return input.map(new NoOpMapFunction())
    .setParallelism(4);
}

代码示例来源:origin: apache/flink

public static SingleOutputStreamOperator<Integer> createFirstStatefulMap(ExecutionMode mode, DataStream<Integer> input) {
  SingleOutputStreamOperator<Integer> map = input
    .map(new StatefulStringStoringMap(mode, "first"))
    .setParallelism(4)
    .uid("first");
  return map;
}

代码示例来源:origin: apache/flink

public static SingleOutputStreamOperator<Integer> createFirstStatefulMap(ExecutionMode mode, DataStream<Integer> input) {
  return input
    .map(new StatefulStringStoringMap(mode, "first"))
    .setParallelism(4)
    .uid("first");
}

代码示例来源:origin: apache/flink

public static SingleOutputStreamOperator<Integer> createSecondStatefulMap(ExecutionMode mode, DataStream<Integer> input) {
  return input
    .map(new StatefulStringStoringMap(mode, "second"))
    .setParallelism(4)
    .uid("second");
}

代码示例来源:origin: apache/flink

public static SingleOutputStreamOperator<Integer> createSecondStatefulMap(ExecutionMode mode, DataStream<Integer> input) {
  SingleOutputStreamOperator<Integer> map = input
    .map(new StatefulStringStoringMap(mode, "second"))
    .setParallelism(4)
    .uid("second");
  return map;
}

代码示例来源:origin: apache/flink

public static SingleOutputStreamOperator<Integer> createThirdStatefulMap(ExecutionMode mode, DataStream<Integer> input) {
  SingleOutputStreamOperator<Integer> map = input
    .map(new StatefulStringStoringMap(mode, "third"))
    .setParallelism(4)
    .uid("third");
  return map;
}

代码示例来源:origin: apache/flink

/**
 * Extracts a timestamp from an element and assigns it as the internal timestamp of that element.
 * The internal timestamps are, for example, used to to event-time window operations.
 *
 * <p>If you know that the timestamps are strictly increasing you can use an
 * {@link AscendingTimestampExtractor}. Otherwise,
 * you should provide a {@link TimestampExtractor} that also implements
 * {@link TimestampExtractor#getCurrentWatermark()} to keep track of watermarks.
 *
 * @param extractor The TimestampExtractor that is called for each element of the DataStream.
 *
 * @deprecated Please use {@link #assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks)}
 *             of {@link #assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks)}
 *             instead.
 * @see #assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks)
 * @see #assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks)
 */
@Deprecated
public SingleOutputStreamOperator<T> assignTimestamps(TimestampExtractor<T> extractor) {
  // match parallelism to input, otherwise dop=1 sources could lead to some strange
  // behaviour: the watermark will creep along very slowly because the elements
  // from the source go to each extraction operator round robin.
  int inputParallelism = getTransformation().getParallelism();
  ExtractTimestampsOperator<T> operator = new ExtractTimestampsOperator<>(clean(extractor));
  return transform("ExtractTimestamps", getTransformation().getOutputType(), operator)
      .setParallelism(inputParallelism);
}

代码示例来源:origin: apache/flink

public static SingleOutputStreamOperator<Integer> createWindowFunction(ExecutionMode mode, DataStream<Tuple2<Integer, Integer>> input) {
  return input
    .keyBy(0)
    .countWindow(1)
    .apply(new StatefulWindowFunction(mode))
    .setParallelism(4)
    .uid("window");
}

代码示例来源:origin: apache/flink

.setParallelism(inputParallelism);

代码示例来源:origin: apache/flink

.setParallelism(inputParallelism);

代码示例来源:origin: apache/flink

private static void addSmallBoundedJob(StreamExecutionEnvironment env, int parallelism) {
    DataStream<Long> stream = env.generateSequence(1, 100).setParallelism(parallelism);

    stream
        .filter(ignored -> false).setParallelism(parallelism)
          .startNewChain()
          .print().setParallelism(parallelism);
  }
}

代码示例来源:origin: apache/flink

@Test(expected = UnsupportedOperationException.class)
public void testDifferingParallelism() throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  // introduce dummy mapper to get to correct parallelism
  DataStream<Integer> source = env.fromElements(1, 10)
      .map(noOpIntMap);
  IterativeStream<Integer> iter1 = source.iterate();
  iter1.closeWith(iter1.map(noOpIntMap).setParallelism(parallelism / 2));
}

代码示例来源:origin: apache/flink

@Test(expected = UnsupportedOperationException.class)
public void testCoDifferingParallelism() throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  // introduce dummy mapper to get to correct parallelism
  DataStream<Integer> source = env.fromElements(1, 10).map(noOpIntMap);
  ConnectedIterativeStreams<Integer, Integer> coIter = source.iterate().withFeedbackType(
      Integer.class);
  coIter.closeWith(coIter.map(noOpIntCoMap).setParallelism(parallelism / 2));
}

代码示例来源:origin: apache/flink

.setParallelism(input1.getParallelism())
    .returns(unionType);
DataStream<TaggedUnion<T1, T2>> taggedInput2 = input2
    .map(new Input2Tagger<T1, T2>())
    .setParallelism(input2.getParallelism())
    .returns(unionType);

代码示例来源:origin: apache/flink

@Test(expected = UnsupportedOperationException.class)
public void testForwardFailsHightToLowParallelism() throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  // this does a rebalance that works
  DataStream<Integer> src = env.fromElements(1, 2, 3).map(new NoOpIntMap());
  // this doesn't work because it goes from 3 to 1
  src.forward().map(new NoOpIntMap()).setParallelism(1);
  env.execute();
}

代码示例来源:origin: apache/flink

DataStream<Integer> head1 = iter1.map(noOpIntMap).name("IterRebalanceMap").setParallelism(parallelism / 2);
DataStream<Integer> head2 = iter1.map(noOpIntMap).name("IterForwardMap");
DataStreamSink<Integer> head3 = iter1.map(noOpIntMap).setParallelism(parallelism / 2).addSink(new ReceiveCheckNoOpSink<Integer>());
DataStreamSink<Integer> head4 = iter1.map(noOpIntMap).addSink(new ReceiveCheckNoOpSink<Integer>());

代码示例来源:origin: apache/flink

/**
 * Tests that the max parallelism is automatically set to the parallelism if it has not been
 * specified.
 */
@Test
public void testAutoMaxParallelism() {
  int globalParallelism = 42;
  int mapParallelism = 17;
  int maxParallelism = 21;
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setParallelism(globalParallelism);
  DataStream<Integer> source = env.fromElements(1, 2, 3);
  DataStream<Integer> keyedResult1 = source.keyBy(value -> value).map(new NoOpIntMap());
  DataStream<Integer> keyedResult2 = keyedResult1.keyBy(value -> value).map(new NoOpIntMap()).setParallelism(mapParallelism);
  DataStream<Integer> keyedResult3 = keyedResult2.keyBy(value -> value).map(new NoOpIntMap()).setMaxParallelism(maxParallelism);
  DataStream<Integer> keyedResult4 = keyedResult3.keyBy(value -> value).map(new NoOpIntMap()).setMaxParallelism(maxParallelism).setParallelism(mapParallelism);
  keyedResult4.addSink(new DiscardingSink<>());
  StreamGraph graph = env.getStreamGraph();
  StreamNode keyedResult3Node = graph.getStreamNode(keyedResult3.getId());
  StreamNode keyedResult4Node = graph.getStreamNode(keyedResult4.getId());
  assertEquals(maxParallelism, keyedResult3Node.getMaxParallelism());
  assertEquals(maxParallelism, keyedResult4Node.getMaxParallelism());
}

代码示例来源:origin: apache/flink

.setParallelism(parallelism / 2)
    .name("shuffle").rebalance();
DataStreamSink<Integer> head3 = iter1.map(noOpIntMap).setParallelism(parallelism / 2)
    .addSink(new ReceiveCheckNoOpSink<Integer>());
DataStreamSink<Integer> head4 = iter1.map(noOpIntMap).addSink(new ReceiveCheckNoOpSink<Integer>());

代码示例来源:origin: apache/flink

assertEquals(3, env.getStreamGraph().getStreamNode(parallelSource.getId()).getParallelism());
map.setParallelism(2);
assertEquals(2, env.getStreamGraph().getStreamNode(map.getId()).getParallelism());

相关文章

微信公众号

最新文章

更多