org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator.uid()方法的使用及代码示例

x33g5p2x  于2022-01-30 转载在 其他  
字(10.7k)|赞(0)|评价(0)|浏览(232)

本文整理了Java中org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator.uid()方法的一些代码示例,展示了SingleOutputStreamOperator.uid()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。SingleOutputStreamOperator.uid()方法的具体详情如下:
包路径:org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
类名称:SingleOutputStreamOperator
方法名:uid

SingleOutputStreamOperator.uid介绍

[英]Sets an ID for this operator.

The specified ID is used to assign the same operator ID across job submissions (for example when starting a job from a savepoint).

Important: this ID needs to be unique per transformation and job. Otherwise, job submission will fail.
[中]

代码示例

代码示例来源:origin: apache/flink

private static KeyedStream<Event, Integer> applyTestStatefulOperator(
  String name,
  JoinFunction<Event, ComplexPayload, ComplexPayload> stateFunc,
  KeyedStream<Event, Integer> source,
  List<TypeSerializer<ComplexPayload>> stateSer,
  List<Class<ComplexPayload>> stateClass) {
  return source
    .map(createArtificialKeyedStateMapper(e -> e, stateFunc, stateSer, stateClass))
    .name(name)
    .uid(name)
    .returns(Event.class)
    .keyBy(Event::getKey);
}

代码示例来源:origin: apache/flink

public static SingleOutputStreamOperator<Integer> createThirdStatefulMap(ExecutionMode mode, DataStream<Integer> input) {
  SingleOutputStreamOperator<Integer> map = input
    .map(new StatefulStringStoringMap(mode, "third"))
    .setParallelism(4)
    .uid("third");
  return map;
}

代码示例来源:origin: apache/flink

public static SingleOutputStreamOperator<Integer> createSecondStatefulMap(ExecutionMode mode, DataStream<Integer> input) {
  return input
    .map(new StatefulStringStoringMap(mode, "second"))
    .setParallelism(4)
    .uid("second");
}

代码示例来源:origin: apache/flink

public static SingleOutputStreamOperator<Integer> createSecondStatefulMap(ExecutionMode mode, DataStream<Integer> input) {
  SingleOutputStreamOperator<Integer> map = input
    .map(new StatefulStringStoringMap(mode, "second"))
    .setParallelism(4)
    .uid("second");
  return map;
}

代码示例来源:origin: apache/flink

public static SingleOutputStreamOperator<Integer> createFirstStatefulMap(ExecutionMode mode, DataStream<Integer> input) {
  SingleOutputStreamOperator<Integer> map = input
    .map(new StatefulStringStoringMap(mode, "first"))
    .setParallelism(4)
    .uid("first");
  return map;
}

代码示例来源:origin: apache/flink

public static SingleOutputStreamOperator<Integer> createFirstStatefulMap(ExecutionMode mode, DataStream<Integer> input) {
  return input
    .map(new StatefulStringStoringMap(mode, "first"))
    .setParallelism(4)
    .uid("first");
}

代码示例来源:origin: apache/flink

/**
 * If expected values ever change double check that the change is not braking the contract of
 * {@link StreamingRuntimeContext#getOperatorUniqueID()} being stable between job submissions.
 */
@Test
public void testGetOperatorUniqueID() throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
  env.fromElements(1, 2, 3)
    .map(new VerifyOperatorIDMapFunction("6c4f323f22da8fb6e34f80c61be7a689")).uid("42")
    .map(new VerifyOperatorIDMapFunction("3e129e83691e7737fbf876b47452acbc")).uid("44");
  env.execute();
}

代码示例来源:origin: apache/flink

public static void main(String[] args) throws Exception {
  final ParameterTool pt = ParameterTool.fromArgs(args);
  final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  setupEnvironment(env, pt);
  KeyedStream<Event, Integer> source = env.addSource(createEventSource(pt))
    .name("EventSource")
    .uid("EventSource")
    .assignTimestampsAndWatermarks(createTimestampExtractor(pt))
    .keyBy(Event::getKey);
  List<TypeSerializer<ComplexPayload>> stateSer =
    Collections.singletonList(new KryoSerializer<>(ComplexPayload.class, env.getConfig()));
  KeyedStream<Event, Integer> afterStatefulOperations = isOriginalJobVariant(pt) ?
    applyOriginalStatefulOperations(source, stateSer, Collections.emptyList()) :
    applyUpgradedStatefulOperations(source, stateSer, Collections.emptyList());
  afterStatefulOperations
    .flatMap(createSemanticsCheckMapper(pt))
    .name("SemanticsCheckMapper")
    .addSink(new PrintSinkFunction<>());
  env.execute("General purpose test job");
}

代码示例来源:origin: apache/flink

public static SingleOutputStreamOperator<Integer> createWindowFunction(ExecutionMode mode, DataStream<Tuple2<Integer, Integer>> input) {
  return input
    .keyBy(0)
    .countWindow(1)
    .apply(new StatefulWindowFunction(mode))
    .setParallelism(4)
    .uid("window");
}

代码示例来源:origin: apache/flink

Collections.singletonList(ComplexPayload.class) // KryoSerializer via type extraction
).returns(Event.class).name(KEYED_STATE_OPER_NAME + "_Kryo_and_Custom_Stateful").uid("0002");
    Collections.singletonList(ComplexPayloadAvro.class) // AvroSerializer via type extraction
).returns(Event.class).name(KEYED_STATE_OPER_NAME + "_Avro").uid("0003");
.name(OPERATOR_STATE_OPER_NAME).uid("0004");
}).name(TIME_WINDOW_OPER_NAME).uid("0005");
  .map(createFailureMapper(pt))
  .setParallelism(1)
  .name(FAILURE_MAPPER_NAME).uid("0006");
.flatMap(createSemanticsCheckMapper(pt))
.name(SEMANTICS_CHECK_MAPPER_NAME)
.uid("007")
.addSink(new PrintSinkFunction<>())
.uid("008");
.uid("009");
.uid("010")
.name(SLIDING_WINDOW_CHECK_MAPPER_NAME)
.addSink(new PrintSinkFunction<>())

代码示例来源:origin: apache/flink

public static void main(String[] args) throws Exception {
    final ParameterTool pt = ParameterTool.fromArgs(args);
    final String checkpointDir = pt.getRequired("checkpoint.dir");

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setStateBackend(new FsStateBackend(checkpointDir));
    env.setRestartStrategy(RestartStrategies.noRestart());
    env.enableCheckpointing(1000L);
    env.getConfig().disableGenericTypes();

    env.addSource(new MySource()).uid("my-source")
        .keyBy(anInt -> 0)
        .map(new MyStatefulFunction()).uid("my-map")
        .addSink(new DiscardingSink<>()).uid("my-sink");
    env.execute();
  }
}

代码示例来源:origin: apache/flink

.addSource(new LegacyCheckpointedSource(NUM_SOURCE_ELEMENTS)).setMaxParallelism(1).uid("LegacyCheckpointedSource")
.flatMap(new LegacyCheckpointedFlatMap()).startNewChain().uid("LegacyCheckpointedFlatMap")
.keyBy(0)
.flatMap(new LegacyCheckpointedFlatMapWithKeyedState()).startNewChain().uid("LegacyCheckpointedFlatMapWithKeyedState")
.keyBy(0)
.flatMap(new KeyedStateSettingFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap")
.keyBy(0)
.transform(
  "custom_operator",
  new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(),
  new CheckpointedUdfOperator(new LegacyCheckpointedFlatMapWithKeyedState())).uid("LegacyCheckpointedOperator")
.keyBy(0)
.transform(
  "timely_stateful_operator",
  new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(),
  new TimelyStatefulOperator()).uid("TimelyStatefulOperator")
.addSink(new AccumulatorCountingSink<Tuple2<Long, Long>>());

代码示例来源:origin: apache/flink

/**
 * Tests that a manual hash for an intermediate chain node is accepted.
 */
@Test
public void testManualHashAssignmentForIntermediateNodeInChain() throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
  env.setParallelism(4);
  env.addSource(new NoOpSourceFunction())
      // Intermediate chained node
      .map(new NoOpMapFunction()).uid("map")
      .addSink(new NoOpSinkFunction());
  env.getStreamGraph().getJobGraph();
}

代码示例来源:origin: apache/flink

.process(firstBroadcastFunction).uid("BrProcess1")
.addSink(new MigrationTestUtils.AccumulatorCountingSink<>());
.process(secondBroadcastFunction).uid("BrProcess2")
.addSink(new MigrationTestUtils.AccumulatorCountingSink<>());

代码示例来源:origin: apache/flink

/**
 * Tests that a collision on the manual hash throws an Exception.
 */
@Test(expected = IllegalArgumentException.class)
public void testManualHashAssignmentCollisionThrowsException() throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
  env.setParallelism(4);
  env.disableOperatorChaining();
  env.addSource(new NoOpSourceFunction()).uid("source")
      .map(new NoOpMapFunction()).uid("source") // Collision
      .addSink(new NoOpSinkFunction());
  // This call is necessary to generate the job graph
  env.getStreamGraph().getJobGraph();
}

代码示例来源:origin: apache/flink

.addSource(new CheckingRestoringSource(NUM_SOURCE_ELEMENTS)).setMaxParallelism(1).uid("LegacyCheckpointedSource")
.flatMap(new CheckingRestoringFlatMap()).startNewChain().uid("LegacyCheckpointedFlatMap")
.keyBy(0)
.flatMap(new CheckingRestoringFlatMapWithKeyedState()).startNewChain().uid("LegacyCheckpointedFlatMapWithKeyedState")
.keyBy(0)
.flatMap(new CheckingKeyedStateFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap")
.keyBy(0)
.transform(
  "custom_operator",
  new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(),
  new CheckingRestoringUdfOperator(new CheckingRestoringFlatMapWithKeyedStateInOperator())).uid("LegacyCheckpointedOperator")
.keyBy(0)
.transform(
  "timely_stateful_operator",
  new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(),
  new CheckingTimelyStatefulOperator()).uid("TimelyStatefulOperator")
.addSink(new AccumulatorCountingSink<Tuple2<Long, Long>>());

代码示例来源:origin: apache/flink

.map(value -> 4 * value)
.shuffle()
.map(statefulCounter).uid("statefulCounter")
.shuffle()
.map(value -> 2 * value)
.map(new StatefulCounter()).uid("statefulCounter")
.shuffle()
.map(value -> value)

代码示例来源:origin: apache/flink

.addSource(nonParallelSource).uid("CheckpointingSource1")
.keyBy(0)
.flatMap(flatMap).startNewChain().uid("CheckpointingKeyedStateFlatMap1")
.keyBy(0)
.transform(
  "timely_stateful_operator",
  new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(),
  timelyOperator).uid("CheckpointingTimelyStatefulOperator1")
.addSink(new MigrationTestUtils.AccumulatorCountingSink<>());
.addSource(parallelSource).uid("CheckpointingSource2")
.keyBy(0)
.flatMap(flatMap).startNewChain().uid("CheckpointingKeyedStateFlatMap2")
.keyBy(0)
.transform(
  "timely_stateful_operator",
  new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(),
  timelyOperator).uid("CheckpointingTimelyStatefulOperator2")
.addSink(new MigrationTestUtils.AccumulatorCountingSink<>());

代码示例来源:origin: apache/flink

.name("source").uid("source");
.name("source").uid("source");

代码示例来源:origin: king/bravo

public DataStream<String> constructTestPipeline(DataStream<String> source) {
  return source
      .map(Integer::parseInt)
      .returns(Integer.class)
      .keyBy(i -> i)
      .map(new StatefulCounter())
      .uid("hello")
      .map(Tuple2::toString)
      .returns(String.class);
}

相关文章

微信公众号

最新文章

更多