org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator.startNewChain()方法的使用及代码示例

x33g5p2x  于2022-01-30 转载在 其他  
字(11.7k)|赞(0)|评价(0)|浏览(185)

本文整理了Java中org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator.startNewChain()方法的一些代码示例,展示了SingleOutputStreamOperator.startNewChain()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。SingleOutputStreamOperator.startNewChain()方法的具体详情如下:
包路径:org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
类名称:SingleOutputStreamOperator
方法名:startNewChain

SingleOutputStreamOperator.startNewChain介绍

[英]Starts a new task chain beginning at this operator. This operator will not be chained (thread co-located for increased performance) to any previous tasks even if possible.
[中]从此操作员开始启动新的任务链。即使可能,该操作员也不会被链接到任何以前的任务(线程位于同一位置以提高性能)。

代码示例

代码示例来源:origin: apache/flink

@Override
  public void createRestoredJob(StreamExecutionEnvironment env) {
    /**
     * Original job: Source -> StatefulMap1 -> CHAIN(StatefulMap2 -> Map -> StatefulMap3)
     * Modified job: Source -> StatefulMap1 -> CHAIN(StatefulMap2 -> Map) -> StatefulMap3
     */
    DataStream<Integer> source = createSource(env, ExecutionMode.RESTORE);

    SingleOutputStreamOperator<Integer> first = createFirstStatefulMap(ExecutionMode.RESTORE, source);
    first.startNewChain();

    SingleOutputStreamOperator<Integer> second = createSecondStatefulMap(ExecutionMode.RESTORE, first);
    second.startNewChain();

    SingleOutputStreamOperator<Integer> stateless = createStatelessMap(second);

    SingleOutputStreamOperator<Integer> third = createThirdStatefulMap(ExecutionMode.RESTORE, stateless);
    third.startNewChain();
  }
}

代码示例来源:origin: apache/flink

@Override
  public void createRestoredJob(StreamExecutionEnvironment env) {
    /**
     * Original job: Source -> StatefulMap1 -> CHAIN(StatefulMap2 -> Map -> StatefulMap3)
     * Modified job: Source -> StatefulMap1 -> CHAIN(Map -> StatefulMap3)
     */
    DataStream<Integer> source = createSource(env, ExecutionMode.RESTORE);

    SingleOutputStreamOperator<Integer> first = createFirstStatefulMap(ExecutionMode.RESTORE, source);
    first.startNewChain();

    SingleOutputStreamOperator<Integer> stateless = createStatelessMap(first);
    stateless.startNewChain();

    SingleOutputStreamOperator<Integer> third = createThirdStatefulMap(ExecutionMode.RESTORE, stateless);
  }
}

代码示例来源:origin: apache/flink

@Override
  public void createRestoredJob(StreamExecutionEnvironment env) {
    /*
     * Original job: Source -> StatefulMap1 -> CHAIN(StatefulMap2 -> Map -> StatefulMap3)
     * Modified job: Source -> StatefulMap1 -> CHAIN(StatefulMap2 -> StatefulMap3)
     */
    DataStream<Integer> source = createSource(env, ExecutionMode.RESTORE);

    SingleOutputStreamOperator<Integer> first = createFirstStatefulMap(ExecutionMode.RESTORE, source);
    first.startNewChain();

    SingleOutputStreamOperator<Integer> second = createSecondStatefulMap(ExecutionMode.RESTORE, first);
    second.startNewChain();

    SingleOutputStreamOperator<Integer> third = createThirdStatefulMap(ExecutionMode.RESTORE, second);
  }
}

代码示例来源:origin: apache/flink

@Override
public void createMigrationJob(StreamExecutionEnvironment env) {
  /**
   * Source -> StatefulMap1 -> CHAIN(StatefulMap2 -> Map -> StatefulMap3)
   */
  DataStream<Integer> source = createSource(env, ExecutionMode.MIGRATE);
  SingleOutputStreamOperator<Integer> first = createFirstStatefulMap(ExecutionMode.MIGRATE, source);
  first.startNewChain();
  SingleOutputStreamOperator<Integer> second = createSecondStatefulMap(ExecutionMode.MIGRATE, first);
  second.startNewChain();
  SingleOutputStreamOperator<Integer> stateless = createStatelessMap(second);
  SingleOutputStreamOperator<Integer> third = createThirdStatefulMap(ExecutionMode.MIGRATE, stateless);
}

代码示例来源:origin: apache/flink

@Override
  public void createRestoredJob(StreamExecutionEnvironment env) {
    /**
     * Original job: Source -> StatefulMap1 -> CHAIN(StatefulMap2 -> Map -> StatefulMap3)
     * Modified job: Source -> StatefulMap1 -> CHAIN(StatefulMap3 -> Map -> StatefulMap2)
     */
    DataStream<Integer> source = createSource(env, ExecutionMode.RESTORE);

    SingleOutputStreamOperator<Integer> first = createFirstStatefulMap(ExecutionMode.RESTORE, source);
    first.startNewChain();

    SingleOutputStreamOperator<Integer> third = createThirdStatefulMap(ExecutionMode.RESTORE, first);
    third.startNewChain();

    SingleOutputStreamOperator<Integer> stateless = createStatelessMap(third);

    SingleOutputStreamOperator<Integer> second = createSecondStatefulMap(ExecutionMode.RESTORE, stateless);
  }
}

代码示例来源:origin: apache/flink

@Override
  public void createRestoredJob(StreamExecutionEnvironment env) {
    /**
     * Original job: Source -> StatefulMap1 -> CHAIN(StatefulMap2 -> Map -> StatefulMap3)
     * Modified job: Source -> StatefulMap1 -> CHAIN(StatefulMap2 -> Map -> StatefulMap3 -> StatefulMap4)
     */
    DataStream<Integer> source = createSource(env, ExecutionMode.RESTORE);

    SingleOutputStreamOperator<Integer> first = createFirstStatefulMap(ExecutionMode.RESTORE, source);
    first.startNewChain();

    SingleOutputStreamOperator<Integer> second = createSecondStatefulMap(ExecutionMode.RESTORE, first);
    second.startNewChain();

    SingleOutputStreamOperator<Integer> stateless = createStatelessMap(second);

    SingleOutputStreamOperator<Integer> stateless2 = createStatelessMap(stateless);

    SingleOutputStreamOperator<Integer> third = createThirdStatefulMap(ExecutionMode.RESTORE, stateless2);
  }
}

代码示例来源:origin: apache/flink

@Override
  protected void createRestoredJob(StreamExecutionEnvironment env) {
    /**
     * Source -> keyBy -> C(Window -> StatefulMap2) -> StatefulMap1
     */
    SingleOutputStreamOperator<Tuple2<Integer, Integer>> source = KeyedJob.createIntegerTupleSource(env, ExecutionMode.RESTORE);

    SingleOutputStreamOperator<Integer> window = KeyedJob.createWindowFunction(ExecutionMode.RESTORE, source);

    SingleOutputStreamOperator<Integer> second = KeyedJob.createSecondStatefulMap(ExecutionMode.RESTORE, window);

    SingleOutputStreamOperator<Integer> first = KeyedJob.createFirstStatefulMap(ExecutionMode.RESTORE, second);
    first.startNewChain();
  }
}

代码示例来源:origin: apache/flink

@Override
  public void createRestoredJob(StreamExecutionEnvironment env) {
    /**
     * Original job: Source -> StatefulMap1 -> CHAIN(StatefulMap2 -> Map -> StatefulMap3)
     * Modified job: Source -> CHAIN(StatefulMap1 -> StatefulMap2 -> Map -> StatefulMap3)
     */
    DataStream<Integer> source = createSource(env, ExecutionMode.RESTORE);

    SingleOutputStreamOperator<Integer> first = createFirstStatefulMap(ExecutionMode.RESTORE, source);
    first.startNewChain();

    SingleOutputStreamOperator<Integer> second = createSecondStatefulMap(ExecutionMode.RESTORE, first);

    SingleOutputStreamOperator<Integer> stateless = createStatelessMap(second);

    SingleOutputStreamOperator<Integer> third = createThirdStatefulMap(ExecutionMode.RESTORE, stateless);
  }
}

代码示例来源:origin: apache/flink

private static void addSmallBoundedJob(StreamExecutionEnvironment env, int parallelism) {
    DataStream<Long> stream = env.generateSequence(1, 100).setParallelism(parallelism);

    stream
        .filter(ignored -> false).setParallelism(parallelism)
          .startNewChain()
          .print().setParallelism(parallelism);
  }
}

代码示例来源:origin: apache/flink

.filter(dummyFilter).slotSharingGroup("default").disableChaining()
.filter(dummyFilter).slotSharingGroup("group 1")
.filter(dummyFilter).startNewChain()
.print().disableChaining();
  .filter(dummyFilter).slotSharingGroup("default").disableChaining()
  .filter(dummyFilter).slotSharingGroup("group 2")
  .filter(dummyFilter).startNewChain()
  .print().disableChaining();

代码示例来源:origin: apache/flink

public static void main(String[] args) throws Exception {
  ParameterTool pt = ParameterTool.fromArgs(args);
  String savepointsPath = pt.getRequired("savepoint-path");
  Configuration config = new Configuration();
  config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointsPath);
  StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config);
  env.enableCheckpointing(500, CheckpointingMode.EXACTLY_ONCE);
  env.setRestartStrategy(RestartStrategies.noRestart());
  env.setStateBackend(new MemoryStateBackend());
  /**
   * Source -> StatefulMap1 -> CHAIN(StatefulMap2 -> Map -> StatefulMap3)
   */
  DataStream<Integer> source = createSource(env, ExecutionMode.GENERATE);
  SingleOutputStreamOperator<Integer> first = createFirstStatefulMap(ExecutionMode.GENERATE, source);
  first.startNewChain();
  SingleOutputStreamOperator<Integer> second = createSecondStatefulMap(ExecutionMode.GENERATE, first);
  second.startNewChain();
  SingleOutputStreamOperator<Integer> stateless = createStatelessMap(second);
  SingleOutputStreamOperator<Integer> third = createThirdStatefulMap(ExecutionMode.GENERATE, stateless);
  env.execute("job");
}

代码示例来源:origin: apache/flink

return value;
}).startNewChain()

代码示例来源:origin: apache/flink

/**
 * Runs the following program.
 * <pre>
 *     [ (source)->(filter)] -> [ (map) -> (map) ] -> [ (groupBy/reduce)->(sink) ]
 * </pre>
 */
@Override
public void testProgram(StreamExecutionEnvironment env) {
  assertTrue("Broken test setup", NUM_STRINGS % 40 == 0);
  final long failurePosMin = (long) (0.4 * NUM_STRINGS / PARALLELISM);
  final long failurePosMax = (long) (0.7 * NUM_STRINGS / PARALLELISM);
  final long failurePos = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin;
  env.enableCheckpointing(200);
  DataStream<String> stream = env.addSource(new StringGeneratingSourceFunction(NUM_STRINGS));
  stream
      // first vertex, chained to the source
      // this filter throttles the flow until at least one checkpoint
      // is complete, to make sure this program does not run without
      .filter(new StringRichFilterFunction())
          // -------------- seconds vertex - one-to-one connected ----------------
      .map(new StringPrefixCountRichMapFunction())
      .startNewChain()
      .map(new StatefulCounterFunction())
          // -------------- third vertex - reducer and the sink ----------------
      .keyBy("prefix")
      .flatMap(new OnceFailingAggregator(failurePos))
      .addSink(new ValidatingSink());
}

代码示例来源:origin: apache/flink

.startNewChain()
.filter(new NoOpFilterFunction())
.addSink(new NoOpSinkFunction());
.startNewChain()
.filter(new NoOpFilterFunction())
.startNewChain()
.addSink(new NoOpSinkFunction());

代码示例来源:origin: apache/flink

return value;
}).startNewChain().addSink(new DiscardingSink<Integer>());

代码示例来源:origin: apache/flink

/**
 * Runs the following program.
 * <pre>
 *     [ (source)->(filter) ]-s->[ (map) ] -> [ (map) ] -> [ (groupBy/count)->(sink) ]
 * </pre>
 */
@Override
public void testProgram(StreamExecutionEnvironment env) {
  DataStream<String> stream = env.addSource(new StringGeneratingSourceFunction(NUM_STRINGS));
  stream
      // -------------- first vertex, chained to the source ----------------
      .filter(new StringRichFilterFunction())
      .shuffle()
      // -------------- seconds vertex - the stateful one that also fails ----------------
      .map(new StringPrefixCountRichMapFunction())
      .startNewChain()
      .map(new StatefulCounterFunction())
      // -------------- third vertex - counter and the sink ----------------
      .keyBy("prefix")
      .map(new OnceFailingPrefixCounter(NUM_STRINGS))
      .addSink(new SinkFunction<PrefixCount>() {
        @Override
        public void invoke(PrefixCount value) throws Exception {
          // Do nothing here
        }
      });
}

代码示例来源:origin: apache/flink

.flatMap(new LegacyCheckpointedFlatMap()).startNewChain().uid("LegacyCheckpointedFlatMap")
.keyBy(0)
.flatMap(new LegacyCheckpointedFlatMapWithKeyedState()).startNewChain().uid("LegacyCheckpointedFlatMapWithKeyedState")
.keyBy(0)
.flatMap(new KeyedStateSettingFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap")
.keyBy(0)
.transform(

代码示例来源:origin: apache/flink

.startNewChain()
.filter(new NoOpFilterFunction())
.addSink(new NoOpSinkFunction());

代码示例来源:origin: apache/flink

.flatMap(new CheckingRestoringFlatMap()).startNewChain().uid("LegacyCheckpointedFlatMap")
.keyBy(0)
.flatMap(new CheckingRestoringFlatMapWithKeyedState()).startNewChain().uid("LegacyCheckpointedFlatMapWithKeyedState")
.keyBy(0)
.flatMap(new CheckingKeyedStateFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap")
.keyBy(0)
.transform(

代码示例来源:origin: apache/flink

.startNewChain()
.map(new StatefulCounterFunction())

相关文章

微信公众号

最新文章

更多