org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator.name()方法的使用及代码示例

x33g5p2x  于2022-01-30 转载在 其他  
字(9.4k)|赞(0)|评价(0)|浏览(175)

本文整理了Java中org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator.name()方法的一些代码示例,展示了SingleOutputStreamOperator.name()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。SingleOutputStreamOperator.name()方法的具体详情如下:
包路径:org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
类名称:SingleOutputStreamOperator
方法名:name

SingleOutputStreamOperator.name介绍

[英]Sets the name of the current data stream. This name is used by the visualization and logging during runtime.
[中]设置当前数据流的名称。运行时可视化和日志记录使用此名称。

代码示例

代码示例来源:origin: apache/flink

/**
   * A thin wrapper layer over {@link SingleOutputStreamOperator#name(String)} .
   *
   * @param name operator name
   * @return The named operator.
   */
  public PythonSingleOutputStreamOperator name(String name) {
    this.stream.name(name);
    return this;
  }
}

代码示例来源:origin: apache/flink

private static KeyedStream<Event, Integer> applyTestStatefulOperator(
  String name,
  JoinFunction<Event, ComplexPayload, ComplexPayload> stateFunc,
  KeyedStream<Event, Integer> source,
  List<TypeSerializer<ComplexPayload>> stateSer,
  List<Class<ComplexPayload>> stateClass) {
  return source
    .map(createArtificialKeyedStateMapper(e -> e, stateFunc, stateSer, stateClass))
    .name(name)
    .uid(name)
    .returns(Event.class)
    .keyBy(Event::getKey);
}

代码示例来源:origin: apache/flink

public static void main(String[] args) throws Exception {
  final ParameterTool pt = ParameterTool.fromArgs(args);
  final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  setupEnvironment(env, pt);
  KeyedStream<Event, Integer> source = env.addSource(createEventSource(pt))
    .name("EventSource")
    .uid("EventSource")
    .assignTimestampsAndWatermarks(createTimestampExtractor(pt))
    .keyBy(Event::getKey);
  List<TypeSerializer<ComplexPayload>> stateSer =
    Collections.singletonList(new KryoSerializer<>(ComplexPayload.class, env.getConfig()));
  KeyedStream<Event, Integer> afterStatefulOperations = isOriginalJobVariant(pt) ?
    applyOriginalStatefulOperations(source, stateSer, Collections.emptyList()) :
    applyUpgradedStatefulOperations(source, stateSer, Collections.emptyList());
  afterStatefulOperations
    .flatMap(createSemanticsCheckMapper(pt))
    .name("SemanticsCheckMapper")
    .addSink(new PrintSinkFunction<>());
  env.execute("General purpose test job");
}

代码示例来源:origin: apache/flink

Collections.singletonList(ComplexPayload.class) // KryoSerializer via type extraction
).returns(Event.class).name(KEYED_STATE_OPER_NAME + "_Kryo_and_Custom_Stateful").uid("0002");
    Collections.singletonList(ComplexPayloadAvro.class) // AvroSerializer via type extraction
).returns(Event.class).name(KEYED_STATE_OPER_NAME + "_Avro").uid("0003");
.name(OPERATOR_STATE_OPER_NAME).uid("0004");
}).name(TIME_WINDOW_OPER_NAME).uid("0005");
  .map(createFailureMapper(pt))
  .setParallelism(1)
  .name(FAILURE_MAPPER_NAME).uid("0006");
.name(SEMANTICS_CHECK_MAPPER_NAME)
.uid("007")
.addSink(new PrintSinkFunction<>())
.name(SLIDING_WINDOW_AGG_NAME)
.uid("009");
.flatMap(createSlidingWindowCheckMapper(pt))
.uid("010")
.name(SLIDING_WINDOW_CHECK_MAPPER_NAME)
.addSink(new PrintSinkFunction<>())
.uid("011");

代码示例来源:origin: apache/flink

return null;
}).name("testMap");
    return null;
}).name("testMap");
  public void flatMap2(Long value, Collector<Long> out) throws Exception {}
}).name("testCoFlatMap")
.name("testWindowFold")
.print();

代码示例来源:origin: apache/flink

@Test
public void testUserProvidedHashing() {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
  List<String> userHashes = Arrays.asList("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb");
  env.addSource(new NoOpSourceFunction(), "src").setUidHash(userHashes.get(0))
      .map(new NoOpMapFunction())
      .filter(new NoOpFilterFunction())
      .keyBy(new NoOpKeySelector())
      .reduce(new NoOpReduceFunction()).name("reduce").setUidHash(userHashes.get(1));
  StreamGraph streamGraph = env.getStreamGraph();
  int idx = 1;
  for (JobVertex jobVertex : streamGraph.getJobGraph().getVertices()) {
    List<JobVertexID> idAlternatives = jobVertex.getIdAlternatives();
    Assert.assertEquals(idAlternatives.get(idAlternatives.size() - 1).toString(), userHashes.get(idx));
    --idx;
  }
}

代码示例来源:origin: apache/flink

.name("A")
.map(value -> value)
  .setBufferTimeout(0)
  .name("B")
.map(value -> value)
  .setBufferTimeout(12)
  .name("C")
.map(value -> value)
  .name("D");

代码示例来源:origin: apache/flink

public static void main(String[] args) throws Exception {
  final ParameterTool pt = ParameterTool.fromArgs(args);
  final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  setupEnvironment(env, pt);
  final MonotonicTTLTimeProvider ttlTimeProvider = setBackendWithCustomTTLTimeProvider(env);
  TtlTestConfig config = TtlTestConfig.fromArgs(pt);
  StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(config.ttl)
    .cleanupIncrementally(5, true)
    .cleanupFullSnapshot()
    .build();
  env
    .addSource(new TtlStateUpdateSource(config.keySpace, config.sleepAfterElements, config.sleepTime))
    .name("TtlStateUpdateSource")
    .keyBy(TtlStateUpdate::getKey)
    .flatMap(new TtlVerifyUpdateFunction(ttlConfig, ttlTimeProvider, config.reportStatAfterUpdatesNum))
    .name("TtlVerifyUpdateFunction")
    .addSink(new PrintSinkFunction<>())
    .name("PrintFailedVerifications");
  env.execute("State TTL test job");
}

代码示例来源:origin: apache/flink

.map(noOpStrMap).name("ParallelizeMap");
.map(noOpIntMap).name("ParallelizeMap")
.iterate(2000 * timeoutScale)
.withFeedbackType(Types.STRING);

代码示例来源:origin: apache/flink

out.collect(value);
}).name("test_flatMap");
opMethod.invoke(flatMap, resource3);
    return false;
}).name("test_filter");
opMethod.invoke(increment, resource4);

代码示例来源:origin: apache/flink

@SuppressWarnings("rawtypes")
@Test
public void testSimpleIteration() throws Exception {
  int numRetries = 5;
  int timeoutScale = 1;
  for (int numRetry = 0; numRetry < numRetries; numRetry++) {
    try {
      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      iterated = new boolean[parallelism];
      DataStream<Boolean> source = env.fromCollection(Collections.nCopies(parallelism * 2, false))
          .map(noOpBoolMap).name("ParallelizeMap");
      IterativeStream<Boolean> iteration = source.iterate(3000 * timeoutScale);
      DataStream<Boolean> increment = iteration.flatMap(new IterationHead()).map(noOpBoolMap);
      iteration.map(noOpBoolMap).addSink(new ReceiveCheckNoOpSink());
      iteration.closeWith(increment).addSink(new ReceiveCheckNoOpSink());
      env.execute();
      for (boolean iter : iterated) {
        assertTrue(iter);
      }
      break; // success
    } catch (Throwable t) {
      LOG.info("Run " + (numRetry + 1) + "/" + numRetries + " failed", t);
      if (numRetry >= numRetries - 1) {
        throw t;
      } else {
        timeoutScale *= 2;
      }
    }
  }
}

代码示例来源:origin: apache/flink

.map(noOpIntMap).name("ParallelizeMapShuffle");
DataStream<Integer> source2 = env.fromElements(1, 2, 3, 4, 5)
    .map(noOpIntMap).name("ParallelizeMapRebalance");
DataStream<Integer> head1 = iter1.map(noOpIntMap).name("IterRebalanceMap").setParallelism(parallelism / 2);
DataStream<Integer> head2 = iter1.map(noOpIntMap).name("IterForwardMap");
DataStreamSink<Integer> head3 = iter1.map(noOpIntMap).setParallelism(parallelism / 2).addSink(new ReceiveCheckNoOpSink<Integer>());
DataStreamSink<Integer> head4 = iter1.map(noOpIntMap).addSink(new ReceiveCheckNoOpSink<Integer>());
    .map(noOpIntMap).name("EvenOddSourceMap")
    .split(new EvenOddOutputSelector());

代码示例来源:origin: apache/flink

DataStream<Integer> head1 = iter1.map(noOpIntMap).name("map1");
DataStream<Integer> head2 = iter1.map(noOpIntMap)
    .setParallelism(parallelism / 2)
    .name("shuffle").rebalance();
DataStreamSink<Integer> head3 = iter1.map(noOpIntMap).setParallelism(parallelism / 2)
    .addSink(new ReceiveCheckNoOpSink<Integer>());
    .name("split")
    .split(new EvenOddOutputSelector());
        head1.map(noOpIntMap).name("bc").broadcast(),
        head2.map(noOpIntMap).shuffle()));

代码示例来源:origin: apache/flink

.map(noOpBoolMap).name("ParallelizeMap");

代码示例来源:origin: apache/flink

.map(new NoOpMapFunction()).name("map")
.startNewChain()
.filter(new NoOpFilterFunction())
.map(new NoOpMapFunction()).name("map")
.startNewChain()
.filter(new NoOpFilterFunction())

代码示例来源:origin: apache/flink

@Test
public void testUserProvidedHashingOnChainSupported() {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
  env.addSource(new NoOpSourceFunction(), "src").setUidHash("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")
      .map(new NoOpMapFunction()).setUidHash("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb")
      .filter(new NoOpFilterFunction()).setUidHash("cccccccccccccccccccccccccccccccc")
      .keyBy(new NoOpKeySelector())
      .reduce(new NoOpReduceFunction()).name("reduce").setUidHash("dddddddddddddddddddddddddddddddd");
  env.getStreamGraph().getJobGraph();
}

代码示例来源:origin: apache/flink

.map(noOpIntMap).name("ParallelizeMap");

代码示例来源:origin: apache/flink

.name("source").uid("source");

代码示例来源:origin: apache/flink

.filter(new NoOpFilterFunction())
.keyBy(new NoOpKeySelector())
.reduce(new NoOpReduceFunction()).name("reduce");
.filter(new NoOpFilterFunction())
.keyBy(new NoOpKeySelector())
.reduce(new NoOpReduceFunction()).name("reduce");

代码示例来源:origin: apache/flink

return null;
}).name("MyMap");

相关文章

微信公众号

最新文章

更多