本文整理了Java中org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator.uid()
方法的一些代码示例,展示了SingleOutputStreamOperator.uid()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。SingleOutputStreamOperator.uid()
方法的具体详情如下:
包路径:org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
类名称:SingleOutputStreamOperator
方法名:uid
[英]Sets an ID for this operator.
The specified ID is used to assign the same operator ID across job submissions (for example when starting a job from a savepoint).
Important: this ID needs to be unique per transformation and job. Otherwise, job submission will fail.
[中]
代码示例来源:origin: apache/flink
private static KeyedStream<Event, Integer> applyTestStatefulOperator(
String name,
JoinFunction<Event, ComplexPayload, ComplexPayload> stateFunc,
KeyedStream<Event, Integer> source,
List<TypeSerializer<ComplexPayload>> stateSer,
List<Class<ComplexPayload>> stateClass) {
return source
.map(createArtificialKeyedStateMapper(e -> e, stateFunc, stateSer, stateClass))
.name(name)
.uid(name)
.returns(Event.class)
.keyBy(Event::getKey);
}
代码示例来源:origin: apache/flink
public static SingleOutputStreamOperator<Integer> createThirdStatefulMap(ExecutionMode mode, DataStream<Integer> input) {
SingleOutputStreamOperator<Integer> map = input
.map(new StatefulStringStoringMap(mode, "third"))
.setParallelism(4)
.uid("third");
return map;
}
代码示例来源:origin: apache/flink
public static SingleOutputStreamOperator<Integer> createSecondStatefulMap(ExecutionMode mode, DataStream<Integer> input) {
return input
.map(new StatefulStringStoringMap(mode, "second"))
.setParallelism(4)
.uid("second");
}
代码示例来源:origin: apache/flink
public static SingleOutputStreamOperator<Integer> createSecondStatefulMap(ExecutionMode mode, DataStream<Integer> input) {
SingleOutputStreamOperator<Integer> map = input
.map(new StatefulStringStoringMap(mode, "second"))
.setParallelism(4)
.uid("second");
return map;
}
代码示例来源:origin: apache/flink
public static SingleOutputStreamOperator<Integer> createFirstStatefulMap(ExecutionMode mode, DataStream<Integer> input) {
SingleOutputStreamOperator<Integer> map = input
.map(new StatefulStringStoringMap(mode, "first"))
.setParallelism(4)
.uid("first");
return map;
}
代码示例来源:origin: apache/flink
public static SingleOutputStreamOperator<Integer> createFirstStatefulMap(ExecutionMode mode, DataStream<Integer> input) {
return input
.map(new StatefulStringStoringMap(mode, "first"))
.setParallelism(4)
.uid("first");
}
代码示例来源:origin: apache/flink
/**
* If expected values ever change double check that the change is not braking the contract of
* {@link StreamingRuntimeContext#getOperatorUniqueID()} being stable between job submissions.
*/
@Test
public void testGetOperatorUniqueID() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.fromElements(1, 2, 3)
.map(new VerifyOperatorIDMapFunction("6c4f323f22da8fb6e34f80c61be7a689")).uid("42")
.map(new VerifyOperatorIDMapFunction("3e129e83691e7737fbf876b47452acbc")).uid("44");
env.execute();
}
代码示例来源:origin: apache/flink
public static void main(String[] args) throws Exception {
final ParameterTool pt = ParameterTool.fromArgs(args);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
setupEnvironment(env, pt);
KeyedStream<Event, Integer> source = env.addSource(createEventSource(pt))
.name("EventSource")
.uid("EventSource")
.assignTimestampsAndWatermarks(createTimestampExtractor(pt))
.keyBy(Event::getKey);
List<TypeSerializer<ComplexPayload>> stateSer =
Collections.singletonList(new KryoSerializer<>(ComplexPayload.class, env.getConfig()));
KeyedStream<Event, Integer> afterStatefulOperations = isOriginalJobVariant(pt) ?
applyOriginalStatefulOperations(source, stateSer, Collections.emptyList()) :
applyUpgradedStatefulOperations(source, stateSer, Collections.emptyList());
afterStatefulOperations
.flatMap(createSemanticsCheckMapper(pt))
.name("SemanticsCheckMapper")
.addSink(new PrintSinkFunction<>());
env.execute("General purpose test job");
}
代码示例来源:origin: apache/flink
public static SingleOutputStreamOperator<Integer> createWindowFunction(ExecutionMode mode, DataStream<Tuple2<Integer, Integer>> input) {
return input
.keyBy(0)
.countWindow(1)
.apply(new StatefulWindowFunction(mode))
.setParallelism(4)
.uid("window");
}
代码示例来源:origin: apache/flink
Collections.singletonList(ComplexPayload.class) // KryoSerializer via type extraction
).returns(Event.class).name(KEYED_STATE_OPER_NAME + "_Kryo_and_Custom_Stateful").uid("0002");
Collections.singletonList(ComplexPayloadAvro.class) // AvroSerializer via type extraction
).returns(Event.class).name(KEYED_STATE_OPER_NAME + "_Avro").uid("0003");
.name(OPERATOR_STATE_OPER_NAME).uid("0004");
}).name(TIME_WINDOW_OPER_NAME).uid("0005");
.map(createFailureMapper(pt))
.setParallelism(1)
.name(FAILURE_MAPPER_NAME).uid("0006");
.flatMap(createSemanticsCheckMapper(pt))
.name(SEMANTICS_CHECK_MAPPER_NAME)
.uid("007")
.addSink(new PrintSinkFunction<>())
.uid("008");
.uid("009");
.uid("010")
.name(SLIDING_WINDOW_CHECK_MAPPER_NAME)
.addSink(new PrintSinkFunction<>())
代码示例来源:origin: apache/flink
public static void main(String[] args) throws Exception {
final ParameterTool pt = ParameterTool.fromArgs(args);
final String checkpointDir = pt.getRequired("checkpoint.dir");
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new FsStateBackend(checkpointDir));
env.setRestartStrategy(RestartStrategies.noRestart());
env.enableCheckpointing(1000L);
env.getConfig().disableGenericTypes();
env.addSource(new MySource()).uid("my-source")
.keyBy(anInt -> 0)
.map(new MyStatefulFunction()).uid("my-map")
.addSink(new DiscardingSink<>()).uid("my-sink");
env.execute();
}
}
代码示例来源:origin: apache/flink
.addSource(new LegacyCheckpointedSource(NUM_SOURCE_ELEMENTS)).setMaxParallelism(1).uid("LegacyCheckpointedSource")
.flatMap(new LegacyCheckpointedFlatMap()).startNewChain().uid("LegacyCheckpointedFlatMap")
.keyBy(0)
.flatMap(new LegacyCheckpointedFlatMapWithKeyedState()).startNewChain().uid("LegacyCheckpointedFlatMapWithKeyedState")
.keyBy(0)
.flatMap(new KeyedStateSettingFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap")
.keyBy(0)
.transform(
"custom_operator",
new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(),
new CheckpointedUdfOperator(new LegacyCheckpointedFlatMapWithKeyedState())).uid("LegacyCheckpointedOperator")
.keyBy(0)
.transform(
"timely_stateful_operator",
new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(),
new TimelyStatefulOperator()).uid("TimelyStatefulOperator")
.addSink(new AccumulatorCountingSink<Tuple2<Long, Long>>());
代码示例来源:origin: apache/flink
/**
* Tests that a manual hash for an intermediate chain node is accepted.
*/
@Test
public void testManualHashAssignmentForIntermediateNodeInChain() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.setParallelism(4);
env.addSource(new NoOpSourceFunction())
// Intermediate chained node
.map(new NoOpMapFunction()).uid("map")
.addSink(new NoOpSinkFunction());
env.getStreamGraph().getJobGraph();
}
代码示例来源:origin: apache/flink
.process(firstBroadcastFunction).uid("BrProcess1")
.addSink(new MigrationTestUtils.AccumulatorCountingSink<>());
.process(secondBroadcastFunction).uid("BrProcess2")
.addSink(new MigrationTestUtils.AccumulatorCountingSink<>());
代码示例来源:origin: apache/flink
/**
* Tests that a collision on the manual hash throws an Exception.
*/
@Test(expected = IllegalArgumentException.class)
public void testManualHashAssignmentCollisionThrowsException() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.setParallelism(4);
env.disableOperatorChaining();
env.addSource(new NoOpSourceFunction()).uid("source")
.map(new NoOpMapFunction()).uid("source") // Collision
.addSink(new NoOpSinkFunction());
// This call is necessary to generate the job graph
env.getStreamGraph().getJobGraph();
}
代码示例来源:origin: apache/flink
.addSource(new CheckingRestoringSource(NUM_SOURCE_ELEMENTS)).setMaxParallelism(1).uid("LegacyCheckpointedSource")
.flatMap(new CheckingRestoringFlatMap()).startNewChain().uid("LegacyCheckpointedFlatMap")
.keyBy(0)
.flatMap(new CheckingRestoringFlatMapWithKeyedState()).startNewChain().uid("LegacyCheckpointedFlatMapWithKeyedState")
.keyBy(0)
.flatMap(new CheckingKeyedStateFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap")
.keyBy(0)
.transform(
"custom_operator",
new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(),
new CheckingRestoringUdfOperator(new CheckingRestoringFlatMapWithKeyedStateInOperator())).uid("LegacyCheckpointedOperator")
.keyBy(0)
.transform(
"timely_stateful_operator",
new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(),
new CheckingTimelyStatefulOperator()).uid("TimelyStatefulOperator")
.addSink(new AccumulatorCountingSink<Tuple2<Long, Long>>());
代码示例来源:origin: apache/flink
.map(value -> 4 * value)
.shuffle()
.map(statefulCounter).uid("statefulCounter")
.shuffle()
.map(value -> 2 * value)
.map(new StatefulCounter()).uid("statefulCounter")
.shuffle()
.map(value -> value)
代码示例来源:origin: apache/flink
.addSource(nonParallelSource).uid("CheckpointingSource1")
.keyBy(0)
.flatMap(flatMap).startNewChain().uid("CheckpointingKeyedStateFlatMap1")
.keyBy(0)
.transform(
"timely_stateful_operator",
new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(),
timelyOperator).uid("CheckpointingTimelyStatefulOperator1")
.addSink(new MigrationTestUtils.AccumulatorCountingSink<>());
.addSource(parallelSource).uid("CheckpointingSource2")
.keyBy(0)
.flatMap(flatMap).startNewChain().uid("CheckpointingKeyedStateFlatMap2")
.keyBy(0)
.transform(
"timely_stateful_operator",
new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(),
timelyOperator).uid("CheckpointingTimelyStatefulOperator2")
.addSink(new MigrationTestUtils.AccumulatorCountingSink<>());
代码示例来源:origin: apache/flink
.name("source").uid("source");
.name("source").uid("source");
代码示例来源:origin: king/bravo
public DataStream<String> constructTestPipeline(DataStream<String> source) {
return source
.map(Integer::parseInt)
.returns(Integer.class)
.keyBy(i -> i)
.map(new StatefulCounter())
.uid("hello")
.map(Tuple2::toString)
.returns(String.class);
}
内容来源于网络,如有侵权,请联系作者删除!