本文整理了Java中org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator.name()
方法的一些代码示例,展示了SingleOutputStreamOperator.name()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。SingleOutputStreamOperator.name()
方法的具体详情如下:
包路径:org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
类名称:SingleOutputStreamOperator
方法名:name
[英]Sets the name of the current data stream. This name is used by the visualization and logging during runtime.
[中]设置当前数据流的名称。运行时可视化和日志记录使用此名称。
代码示例来源:origin: apache/flink
/**
* A thin wrapper layer over {@link SingleOutputStreamOperator#name(String)} .
*
* @param name operator name
* @return The named operator.
*/
public PythonSingleOutputStreamOperator name(String name) {
this.stream.name(name);
return this;
}
}
代码示例来源:origin: apache/flink
private static KeyedStream<Event, Integer> applyTestStatefulOperator(
String name,
JoinFunction<Event, ComplexPayload, ComplexPayload> stateFunc,
KeyedStream<Event, Integer> source,
List<TypeSerializer<ComplexPayload>> stateSer,
List<Class<ComplexPayload>> stateClass) {
return source
.map(createArtificialKeyedStateMapper(e -> e, stateFunc, stateSer, stateClass))
.name(name)
.uid(name)
.returns(Event.class)
.keyBy(Event::getKey);
}
代码示例来源:origin: apache/flink
public static void main(String[] args) throws Exception {
final ParameterTool pt = ParameterTool.fromArgs(args);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
setupEnvironment(env, pt);
KeyedStream<Event, Integer> source = env.addSource(createEventSource(pt))
.name("EventSource")
.uid("EventSource")
.assignTimestampsAndWatermarks(createTimestampExtractor(pt))
.keyBy(Event::getKey);
List<TypeSerializer<ComplexPayload>> stateSer =
Collections.singletonList(new KryoSerializer<>(ComplexPayload.class, env.getConfig()));
KeyedStream<Event, Integer> afterStatefulOperations = isOriginalJobVariant(pt) ?
applyOriginalStatefulOperations(source, stateSer, Collections.emptyList()) :
applyUpgradedStatefulOperations(source, stateSer, Collections.emptyList());
afterStatefulOperations
.flatMap(createSemanticsCheckMapper(pt))
.name("SemanticsCheckMapper")
.addSink(new PrintSinkFunction<>());
env.execute("General purpose test job");
}
代码示例来源:origin: apache/flink
Collections.singletonList(ComplexPayload.class) // KryoSerializer via type extraction
).returns(Event.class).name(KEYED_STATE_OPER_NAME + "_Kryo_and_Custom_Stateful").uid("0002");
Collections.singletonList(ComplexPayloadAvro.class) // AvroSerializer via type extraction
).returns(Event.class).name(KEYED_STATE_OPER_NAME + "_Avro").uid("0003");
.name(OPERATOR_STATE_OPER_NAME).uid("0004");
}).name(TIME_WINDOW_OPER_NAME).uid("0005");
.map(createFailureMapper(pt))
.setParallelism(1)
.name(FAILURE_MAPPER_NAME).uid("0006");
.name(SEMANTICS_CHECK_MAPPER_NAME)
.uid("007")
.addSink(new PrintSinkFunction<>())
.name(SLIDING_WINDOW_AGG_NAME)
.uid("009");
.flatMap(createSlidingWindowCheckMapper(pt))
.uid("010")
.name(SLIDING_WINDOW_CHECK_MAPPER_NAME)
.addSink(new PrintSinkFunction<>())
.uid("011");
代码示例来源:origin: apache/flink
return null;
}).name("testMap");
return null;
}).name("testMap");
public void flatMap2(Long value, Collector<Long> out) throws Exception {}
}).name("testCoFlatMap")
.name("testWindowFold")
.print();
代码示例来源:origin: apache/flink
@Test
public void testUserProvidedHashing() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
List<String> userHashes = Arrays.asList("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb");
env.addSource(new NoOpSourceFunction(), "src").setUidHash(userHashes.get(0))
.map(new NoOpMapFunction())
.filter(new NoOpFilterFunction())
.keyBy(new NoOpKeySelector())
.reduce(new NoOpReduceFunction()).name("reduce").setUidHash(userHashes.get(1));
StreamGraph streamGraph = env.getStreamGraph();
int idx = 1;
for (JobVertex jobVertex : streamGraph.getJobGraph().getVertices()) {
List<JobVertexID> idAlternatives = jobVertex.getIdAlternatives();
Assert.assertEquals(idAlternatives.get(idAlternatives.size() - 1).toString(), userHashes.get(idx));
--idx;
}
}
代码示例来源:origin: apache/flink
.name("A")
.map(value -> value)
.setBufferTimeout(0)
.name("B")
.map(value -> value)
.setBufferTimeout(12)
.name("C")
.map(value -> value)
.name("D");
代码示例来源:origin: apache/flink
public static void main(String[] args) throws Exception {
final ParameterTool pt = ParameterTool.fromArgs(args);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
setupEnvironment(env, pt);
final MonotonicTTLTimeProvider ttlTimeProvider = setBackendWithCustomTTLTimeProvider(env);
TtlTestConfig config = TtlTestConfig.fromArgs(pt);
StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(config.ttl)
.cleanupIncrementally(5, true)
.cleanupFullSnapshot()
.build();
env
.addSource(new TtlStateUpdateSource(config.keySpace, config.sleepAfterElements, config.sleepTime))
.name("TtlStateUpdateSource")
.keyBy(TtlStateUpdate::getKey)
.flatMap(new TtlVerifyUpdateFunction(ttlConfig, ttlTimeProvider, config.reportStatAfterUpdatesNum))
.name("TtlVerifyUpdateFunction")
.addSink(new PrintSinkFunction<>())
.name("PrintFailedVerifications");
env.execute("State TTL test job");
}
代码示例来源:origin: apache/flink
.map(noOpStrMap).name("ParallelizeMap");
.map(noOpIntMap).name("ParallelizeMap")
.iterate(2000 * timeoutScale)
.withFeedbackType(Types.STRING);
代码示例来源:origin: apache/flink
out.collect(value);
}).name("test_flatMap");
opMethod.invoke(flatMap, resource3);
return false;
}).name("test_filter");
opMethod.invoke(increment, resource4);
代码示例来源:origin: apache/flink
@SuppressWarnings("rawtypes")
@Test
public void testSimpleIteration() throws Exception {
int numRetries = 5;
int timeoutScale = 1;
for (int numRetry = 0; numRetry < numRetries; numRetry++) {
try {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
iterated = new boolean[parallelism];
DataStream<Boolean> source = env.fromCollection(Collections.nCopies(parallelism * 2, false))
.map(noOpBoolMap).name("ParallelizeMap");
IterativeStream<Boolean> iteration = source.iterate(3000 * timeoutScale);
DataStream<Boolean> increment = iteration.flatMap(new IterationHead()).map(noOpBoolMap);
iteration.map(noOpBoolMap).addSink(new ReceiveCheckNoOpSink());
iteration.closeWith(increment).addSink(new ReceiveCheckNoOpSink());
env.execute();
for (boolean iter : iterated) {
assertTrue(iter);
}
break; // success
} catch (Throwable t) {
LOG.info("Run " + (numRetry + 1) + "/" + numRetries + " failed", t);
if (numRetry >= numRetries - 1) {
throw t;
} else {
timeoutScale *= 2;
}
}
}
}
代码示例来源:origin: apache/flink
.map(noOpIntMap).name("ParallelizeMapShuffle");
DataStream<Integer> source2 = env.fromElements(1, 2, 3, 4, 5)
.map(noOpIntMap).name("ParallelizeMapRebalance");
DataStream<Integer> head1 = iter1.map(noOpIntMap).name("IterRebalanceMap").setParallelism(parallelism / 2);
DataStream<Integer> head2 = iter1.map(noOpIntMap).name("IterForwardMap");
DataStreamSink<Integer> head3 = iter1.map(noOpIntMap).setParallelism(parallelism / 2).addSink(new ReceiveCheckNoOpSink<Integer>());
DataStreamSink<Integer> head4 = iter1.map(noOpIntMap).addSink(new ReceiveCheckNoOpSink<Integer>());
.map(noOpIntMap).name("EvenOddSourceMap")
.split(new EvenOddOutputSelector());
代码示例来源:origin: apache/flink
DataStream<Integer> head1 = iter1.map(noOpIntMap).name("map1");
DataStream<Integer> head2 = iter1.map(noOpIntMap)
.setParallelism(parallelism / 2)
.name("shuffle").rebalance();
DataStreamSink<Integer> head3 = iter1.map(noOpIntMap).setParallelism(parallelism / 2)
.addSink(new ReceiveCheckNoOpSink<Integer>());
.name("split")
.split(new EvenOddOutputSelector());
head1.map(noOpIntMap).name("bc").broadcast(),
head2.map(noOpIntMap).shuffle()));
代码示例来源:origin: apache/flink
.map(noOpBoolMap).name("ParallelizeMap");
代码示例来源:origin: apache/flink
.map(new NoOpMapFunction()).name("map")
.startNewChain()
.filter(new NoOpFilterFunction())
.map(new NoOpMapFunction()).name("map")
.startNewChain()
.filter(new NoOpFilterFunction())
代码示例来源:origin: apache/flink
@Test
public void testUserProvidedHashingOnChainSupported() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.addSource(new NoOpSourceFunction(), "src").setUidHash("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")
.map(new NoOpMapFunction()).setUidHash("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb")
.filter(new NoOpFilterFunction()).setUidHash("cccccccccccccccccccccccccccccccc")
.keyBy(new NoOpKeySelector())
.reduce(new NoOpReduceFunction()).name("reduce").setUidHash("dddddddddddddddddddddddddddddddd");
env.getStreamGraph().getJobGraph();
}
代码示例来源:origin: apache/flink
.map(noOpIntMap).name("ParallelizeMap");
代码示例来源:origin: apache/flink
.name("source").uid("source");
代码示例来源:origin: apache/flink
.filter(new NoOpFilterFunction())
.keyBy(new NoOpKeySelector())
.reduce(new NoOpReduceFunction()).name("reduce");
.filter(new NoOpFilterFunction())
.keyBy(new NoOpKeySelector())
.reduce(new NoOpReduceFunction()).name("reduce");
代码示例来源:origin: apache/flink
return null;
}).name("MyMap");
内容来源于网络,如有侵权,请联系作者删除!