本文整理了Java中org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator.startNewChain()
方法的一些代码示例,展示了SingleOutputStreamOperator.startNewChain()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。SingleOutputStreamOperator.startNewChain()
方法的具体详情如下:
包路径:org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
类名称:SingleOutputStreamOperator
方法名:startNewChain
[英]Starts a new task chain beginning at this operator. This operator will not be chained (thread co-located for increased performance) to any previous tasks even if possible.
[中]从此操作员开始启动新的任务链。即使可能,该操作员也不会被链接到任何以前的任务(线程位于同一位置以提高性能)。
代码示例来源:origin: apache/flink
@Override
public void createRestoredJob(StreamExecutionEnvironment env) {
/**
* Original job: Source -> StatefulMap1 -> CHAIN(StatefulMap2 -> Map -> StatefulMap3)
* Modified job: Source -> StatefulMap1 -> CHAIN(StatefulMap2 -> Map) -> StatefulMap3
*/
DataStream<Integer> source = createSource(env, ExecutionMode.RESTORE);
SingleOutputStreamOperator<Integer> first = createFirstStatefulMap(ExecutionMode.RESTORE, source);
first.startNewChain();
SingleOutputStreamOperator<Integer> second = createSecondStatefulMap(ExecutionMode.RESTORE, first);
second.startNewChain();
SingleOutputStreamOperator<Integer> stateless = createStatelessMap(second);
SingleOutputStreamOperator<Integer> third = createThirdStatefulMap(ExecutionMode.RESTORE, stateless);
third.startNewChain();
}
}
代码示例来源:origin: apache/flink
@Override
public void createRestoredJob(StreamExecutionEnvironment env) {
/**
* Original job: Source -> StatefulMap1 -> CHAIN(StatefulMap2 -> Map -> StatefulMap3)
* Modified job: Source -> StatefulMap1 -> CHAIN(Map -> StatefulMap3)
*/
DataStream<Integer> source = createSource(env, ExecutionMode.RESTORE);
SingleOutputStreamOperator<Integer> first = createFirstStatefulMap(ExecutionMode.RESTORE, source);
first.startNewChain();
SingleOutputStreamOperator<Integer> stateless = createStatelessMap(first);
stateless.startNewChain();
SingleOutputStreamOperator<Integer> third = createThirdStatefulMap(ExecutionMode.RESTORE, stateless);
}
}
代码示例来源:origin: apache/flink
@Override
public void createRestoredJob(StreamExecutionEnvironment env) {
/*
* Original job: Source -> StatefulMap1 -> CHAIN(StatefulMap2 -> Map -> StatefulMap3)
* Modified job: Source -> StatefulMap1 -> CHAIN(StatefulMap2 -> StatefulMap3)
*/
DataStream<Integer> source = createSource(env, ExecutionMode.RESTORE);
SingleOutputStreamOperator<Integer> first = createFirstStatefulMap(ExecutionMode.RESTORE, source);
first.startNewChain();
SingleOutputStreamOperator<Integer> second = createSecondStatefulMap(ExecutionMode.RESTORE, first);
second.startNewChain();
SingleOutputStreamOperator<Integer> third = createThirdStatefulMap(ExecutionMode.RESTORE, second);
}
}
代码示例来源:origin: apache/flink
@Override
public void createMigrationJob(StreamExecutionEnvironment env) {
/**
* Source -> StatefulMap1 -> CHAIN(StatefulMap2 -> Map -> StatefulMap3)
*/
DataStream<Integer> source = createSource(env, ExecutionMode.MIGRATE);
SingleOutputStreamOperator<Integer> first = createFirstStatefulMap(ExecutionMode.MIGRATE, source);
first.startNewChain();
SingleOutputStreamOperator<Integer> second = createSecondStatefulMap(ExecutionMode.MIGRATE, first);
second.startNewChain();
SingleOutputStreamOperator<Integer> stateless = createStatelessMap(second);
SingleOutputStreamOperator<Integer> third = createThirdStatefulMap(ExecutionMode.MIGRATE, stateless);
}
代码示例来源:origin: apache/flink
@Override
public void createRestoredJob(StreamExecutionEnvironment env) {
/**
* Original job: Source -> StatefulMap1 -> CHAIN(StatefulMap2 -> Map -> StatefulMap3)
* Modified job: Source -> StatefulMap1 -> CHAIN(StatefulMap3 -> Map -> StatefulMap2)
*/
DataStream<Integer> source = createSource(env, ExecutionMode.RESTORE);
SingleOutputStreamOperator<Integer> first = createFirstStatefulMap(ExecutionMode.RESTORE, source);
first.startNewChain();
SingleOutputStreamOperator<Integer> third = createThirdStatefulMap(ExecutionMode.RESTORE, first);
third.startNewChain();
SingleOutputStreamOperator<Integer> stateless = createStatelessMap(third);
SingleOutputStreamOperator<Integer> second = createSecondStatefulMap(ExecutionMode.RESTORE, stateless);
}
}
代码示例来源:origin: apache/flink
@Override
public void createRestoredJob(StreamExecutionEnvironment env) {
/**
* Original job: Source -> StatefulMap1 -> CHAIN(StatefulMap2 -> Map -> StatefulMap3)
* Modified job: Source -> StatefulMap1 -> CHAIN(StatefulMap2 -> Map -> StatefulMap3 -> StatefulMap4)
*/
DataStream<Integer> source = createSource(env, ExecutionMode.RESTORE);
SingleOutputStreamOperator<Integer> first = createFirstStatefulMap(ExecutionMode.RESTORE, source);
first.startNewChain();
SingleOutputStreamOperator<Integer> second = createSecondStatefulMap(ExecutionMode.RESTORE, first);
second.startNewChain();
SingleOutputStreamOperator<Integer> stateless = createStatelessMap(second);
SingleOutputStreamOperator<Integer> stateless2 = createStatelessMap(stateless);
SingleOutputStreamOperator<Integer> third = createThirdStatefulMap(ExecutionMode.RESTORE, stateless2);
}
}
代码示例来源:origin: apache/flink
@Override
protected void createRestoredJob(StreamExecutionEnvironment env) {
/**
* Source -> keyBy -> C(Window -> StatefulMap2) -> StatefulMap1
*/
SingleOutputStreamOperator<Tuple2<Integer, Integer>> source = KeyedJob.createIntegerTupleSource(env, ExecutionMode.RESTORE);
SingleOutputStreamOperator<Integer> window = KeyedJob.createWindowFunction(ExecutionMode.RESTORE, source);
SingleOutputStreamOperator<Integer> second = KeyedJob.createSecondStatefulMap(ExecutionMode.RESTORE, window);
SingleOutputStreamOperator<Integer> first = KeyedJob.createFirstStatefulMap(ExecutionMode.RESTORE, second);
first.startNewChain();
}
}
代码示例来源:origin: apache/flink
@Override
public void createRestoredJob(StreamExecutionEnvironment env) {
/**
* Original job: Source -> StatefulMap1 -> CHAIN(StatefulMap2 -> Map -> StatefulMap3)
* Modified job: Source -> CHAIN(StatefulMap1 -> StatefulMap2 -> Map -> StatefulMap3)
*/
DataStream<Integer> source = createSource(env, ExecutionMode.RESTORE);
SingleOutputStreamOperator<Integer> first = createFirstStatefulMap(ExecutionMode.RESTORE, source);
first.startNewChain();
SingleOutputStreamOperator<Integer> second = createSecondStatefulMap(ExecutionMode.RESTORE, first);
SingleOutputStreamOperator<Integer> stateless = createStatelessMap(second);
SingleOutputStreamOperator<Integer> third = createThirdStatefulMap(ExecutionMode.RESTORE, stateless);
}
}
代码示例来源:origin: apache/flink
private static void addSmallBoundedJob(StreamExecutionEnvironment env, int parallelism) {
DataStream<Long> stream = env.generateSequence(1, 100).setParallelism(parallelism);
stream
.filter(ignored -> false).setParallelism(parallelism)
.startNewChain()
.print().setParallelism(parallelism);
}
}
代码示例来源:origin: apache/flink
.filter(dummyFilter).slotSharingGroup("default").disableChaining()
.filter(dummyFilter).slotSharingGroup("group 1")
.filter(dummyFilter).startNewChain()
.print().disableChaining();
.filter(dummyFilter).slotSharingGroup("default").disableChaining()
.filter(dummyFilter).slotSharingGroup("group 2")
.filter(dummyFilter).startNewChain()
.print().disableChaining();
代码示例来源:origin: apache/flink
public static void main(String[] args) throws Exception {
ParameterTool pt = ParameterTool.fromArgs(args);
String savepointsPath = pt.getRequired("savepoint-path");
Configuration config = new Configuration();
config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointsPath);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config);
env.enableCheckpointing(500, CheckpointingMode.EXACTLY_ONCE);
env.setRestartStrategy(RestartStrategies.noRestart());
env.setStateBackend(new MemoryStateBackend());
/**
* Source -> StatefulMap1 -> CHAIN(StatefulMap2 -> Map -> StatefulMap3)
*/
DataStream<Integer> source = createSource(env, ExecutionMode.GENERATE);
SingleOutputStreamOperator<Integer> first = createFirstStatefulMap(ExecutionMode.GENERATE, source);
first.startNewChain();
SingleOutputStreamOperator<Integer> second = createSecondStatefulMap(ExecutionMode.GENERATE, first);
second.startNewChain();
SingleOutputStreamOperator<Integer> stateless = createStatelessMap(second);
SingleOutputStreamOperator<Integer> third = createThirdStatefulMap(ExecutionMode.GENERATE, stateless);
env.execute("job");
}
代码示例来源:origin: apache/flink
return value;
}).startNewChain()
代码示例来源:origin: apache/flink
/**
* Runs the following program.
* <pre>
* [ (source)->(filter)] -> [ (map) -> (map) ] -> [ (groupBy/reduce)->(sink) ]
* </pre>
*/
@Override
public void testProgram(StreamExecutionEnvironment env) {
assertTrue("Broken test setup", NUM_STRINGS % 40 == 0);
final long failurePosMin = (long) (0.4 * NUM_STRINGS / PARALLELISM);
final long failurePosMax = (long) (0.7 * NUM_STRINGS / PARALLELISM);
final long failurePos = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin;
env.enableCheckpointing(200);
DataStream<String> stream = env.addSource(new StringGeneratingSourceFunction(NUM_STRINGS));
stream
// first vertex, chained to the source
// this filter throttles the flow until at least one checkpoint
// is complete, to make sure this program does not run without
.filter(new StringRichFilterFunction())
// -------------- seconds vertex - one-to-one connected ----------------
.map(new StringPrefixCountRichMapFunction())
.startNewChain()
.map(new StatefulCounterFunction())
// -------------- third vertex - reducer and the sink ----------------
.keyBy("prefix")
.flatMap(new OnceFailingAggregator(failurePos))
.addSink(new ValidatingSink());
}
代码示例来源:origin: apache/flink
.startNewChain()
.filter(new NoOpFilterFunction())
.addSink(new NoOpSinkFunction());
.startNewChain()
.filter(new NoOpFilterFunction())
.startNewChain()
.addSink(new NoOpSinkFunction());
代码示例来源:origin: apache/flink
return value;
}).startNewChain().addSink(new DiscardingSink<Integer>());
代码示例来源:origin: apache/flink
/**
* Runs the following program.
* <pre>
* [ (source)->(filter) ]-s->[ (map) ] -> [ (map) ] -> [ (groupBy/count)->(sink) ]
* </pre>
*/
@Override
public void testProgram(StreamExecutionEnvironment env) {
DataStream<String> stream = env.addSource(new StringGeneratingSourceFunction(NUM_STRINGS));
stream
// -------------- first vertex, chained to the source ----------------
.filter(new StringRichFilterFunction())
.shuffle()
// -------------- seconds vertex - the stateful one that also fails ----------------
.map(new StringPrefixCountRichMapFunction())
.startNewChain()
.map(new StatefulCounterFunction())
// -------------- third vertex - counter and the sink ----------------
.keyBy("prefix")
.map(new OnceFailingPrefixCounter(NUM_STRINGS))
.addSink(new SinkFunction<PrefixCount>() {
@Override
public void invoke(PrefixCount value) throws Exception {
// Do nothing here
}
});
}
代码示例来源:origin: apache/flink
.flatMap(new LegacyCheckpointedFlatMap()).startNewChain().uid("LegacyCheckpointedFlatMap")
.keyBy(0)
.flatMap(new LegacyCheckpointedFlatMapWithKeyedState()).startNewChain().uid("LegacyCheckpointedFlatMapWithKeyedState")
.keyBy(0)
.flatMap(new KeyedStateSettingFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap")
.keyBy(0)
.transform(
代码示例来源:origin: apache/flink
.startNewChain()
.filter(new NoOpFilterFunction())
.addSink(new NoOpSinkFunction());
代码示例来源:origin: apache/flink
.flatMap(new CheckingRestoringFlatMap()).startNewChain().uid("LegacyCheckpointedFlatMap")
.keyBy(0)
.flatMap(new CheckingRestoringFlatMapWithKeyedState()).startNewChain().uid("LegacyCheckpointedFlatMapWithKeyedState")
.keyBy(0)
.flatMap(new CheckingKeyedStateFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap")
.keyBy(0)
.transform(
代码示例来源:origin: apache/flink
.startNewChain()
.map(new StatefulCounterFunction())
内容来源于网络,如有侵权,请联系作者删除!