本文整理了Java中org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator.addSink()
方法的一些代码示例,展示了SingleOutputStreamOperator.addSink()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。SingleOutputStreamOperator.addSink()
方法的具体详情如下:
包路径:org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
类名称:SingleOutputStreamOperator
方法名:addSink
暂无
代码示例来源:origin: apache/flink
@SuppressWarnings("rawtypes,unchecked")
private static Integer createDownStreamId(ConnectedStreams dataStream) {
SingleOutputStreamOperator<?> coMap = dataStream.map(new CoMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Object>() {
private static final long serialVersionUID = 1L;
@Override
public Object map1(Tuple2<Long, Long> value) {
return null;
}
@Override
public Object map2(Tuple2<Long, Long> value) {
return null;
}
});
coMap.addSink(new DiscardingSink());
return coMap.getId();
}
代码示例来源:origin: apache/flink
public static void main(String[] args) throws Exception {
final ParameterTool pt = ParameterTool.fromArgs(args);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
setupEnvironment(env, pt);
KeyedStream<Event, Integer> source = env.addSource(createEventSource(pt))
.name("EventSource")
.uid("EventSource")
.assignTimestampsAndWatermarks(createTimestampExtractor(pt))
.keyBy(Event::getKey);
List<TypeSerializer<ComplexPayload>> stateSer =
Collections.singletonList(new KryoSerializer<>(ComplexPayload.class, env.getConfig()));
KeyedStream<Event, Integer> afterStatefulOperations = isOriginalJobVariant(pt) ?
applyOriginalStatefulOperations(source, stateSer, Collections.emptyList()) :
applyUpgradedStatefulOperations(source, stateSer, Collections.emptyList());
afterStatefulOperations
.flatMap(createSemanticsCheckMapper(pt))
.name("SemanticsCheckMapper")
.addSink(new PrintSinkFunction<>());
env.execute("General purpose test job");
}
代码示例来源:origin: apache/flink
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
DataStream<Tuple2<Long, Long>> stream = env.addSource(new DataSource());
stream
.keyBy(0)
.timeWindow(Time.of(2500, MILLISECONDS), Time.of(500, MILLISECONDS))
.reduce(new SummingReducer())
// alternative: use a apply function which does not pre-aggregate
// .keyBy(new FirstFieldKeyExtractor<Tuple2<Long, Long>, Long>())
// .window(Time.of(2500, MILLISECONDS), Time.of(500, MILLISECONDS))
// .apply(new SummingWindowFunction())
.addSink(new SinkFunction<Tuple2<Long, Long>>() {
@Override
public void invoke(Tuple2<Long, Long> value) {
}
});
env.execute();
}
代码示例来源:origin: apache/flink
/**
* Tests that there are no collisions with two identical intermediate nodes connected to the
* same predecessor.
*
* <pre>
* /-> [ (map) ] -> [ (sink) ]
* [ (src) ] -+
* \-> [ (map) ] -> [ (sink) ]
* </pre>
*/
@Test
public void testNodeHashIdenticalNodes() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.setParallelism(4);
env.disableOperatorChaining();
DataStream<String> src = env.addSource(new NoOpSourceFunction());
src.map(new NoOpMapFunction()).addSink(new NoOpSinkFunction());
src.map(new NoOpMapFunction()).addSink(new NoOpSinkFunction());
JobGraph jobGraph = env.getStreamGraph().getJobGraph();
Set<JobVertexID> vertexIds = new HashSet<>();
for (JobVertex vertex : jobGraph.getVertices()) {
assertTrue(vertexIds.add(vertex.getID()));
}
}
代码示例来源:origin: apache/flink
@Test
public void test() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.addSource(new TestSource()).map(new TestMap()).addSink(new DiscardingSink<Integer>());
env.execute();
assertNotEquals(srcContext, mapContext);
}
代码示例来源:origin: apache/flink
public static void main(String[] args) throws Exception {
ParameterTool params = ParameterTool.fromArgs(args);
String outputPath = params.getRequired("outputPath");
StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
sEnv.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3,
Time.of(10, TimeUnit.SECONDS)
));
sEnv.enableCheckpointing(4000);
final int idlenessMs = 10;
// define bucketing sink to emit the result
BucketingSink<Tuple4<Integer, Long, Integer, String>> sink = new BucketingSink<Tuple4<Integer, Long, Integer, String>>(outputPath)
.setBucketer(new KeyBucketer());
// generate data, shuffle, perform stateful operation, sink
sEnv.addSource(new Generator(10, idlenessMs, 60))
.keyBy(0)
.map(new SubtractingMapper(-1L * idlenessMs))
.addSink(sink);
sEnv.execute();
}
代码示例来源:origin: apache/flink
/**
* Test ProcessFunction side output.
*/
@Test
public void testProcessFunctionSideOutput() throws Exception {
final OutputTag<String> sideOutputTag = new OutputTag<String>("side"){};
TestListResultSink<String> sideOutputResultSink = new TestListResultSink<>();
TestListResultSink<Integer> resultSink = new TestListResultSink<>();
StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
see.setParallelism(3);
DataStream<Integer> dataStream = see.fromCollection(elements);
SingleOutputStreamOperator<Integer> passThroughtStream = dataStream
.process(new ProcessFunction<Integer, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public void processElement(
Integer value, Context ctx, Collector<Integer> out) throws Exception {
out.collect(value);
ctx.output(sideOutputTag, "sideout-" + String.valueOf(value));
}
});
passThroughtStream.getSideOutput(sideOutputTag).addSink(sideOutputResultSink);
passThroughtStream.addSink(resultSink);
see.execute();
assertEquals(Arrays.asList("sideout-1", "sideout-2", "sideout-3", "sideout-4", "sideout-5"), sideOutputResultSink.getSortedResult());
assertEquals(Arrays.asList(1, 2, 3, 4, 5), resultSink.getSortedResult());
}
代码示例来源:origin: apache/flink
/**
* Tests that a manual hash at the beginning of a chain is accepted.
*/
@Test
public void testManualHashAssignmentForStartNodeInInChain() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.setParallelism(4);
env.addSource(new NoOpSourceFunction()).uid("source")
.map(new NoOpMapFunction())
.addSink(new NoOpSinkFunction());
env.getStreamGraph().getJobGraph();
}
代码示例来源:origin: apache/flink
public static void main(String[] args) throws Exception {
final ParameterTool pt = ParameterTool.fromArgs(args);
final String checkpointDir = pt.getRequired("checkpoint.dir");
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new FsStateBackend(checkpointDir));
env.setRestartStrategy(RestartStrategies.noRestart());
env.enableCheckpointing(1000L);
env.getConfig().disableGenericTypes();
env.addSource(new MySource()).uid("my-source")
.keyBy(anInt -> 0)
.map(new MyStatefulFunction()).uid("my-map")
.addSink(new DiscardingSink<>()).uid("my-sink");
env.execute();
}
}
代码示例来源:origin: apache/flink
public static void main(String[] args) throws Exception {
final ParameterTool pt = ParameterTool.fromArgs(args);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
setupEnvironment(env, pt);
final MonotonicTTLTimeProvider ttlTimeProvider = setBackendWithCustomTTLTimeProvider(env);
TtlTestConfig config = TtlTestConfig.fromArgs(pt);
StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(config.ttl)
.cleanupIncrementally(5, true)
.cleanupFullSnapshot()
.build();
env
.addSource(new TtlStateUpdateSource(config.keySpace, config.sleepAfterElements, config.sleepTime))
.name("TtlStateUpdateSource")
.keyBy(TtlStateUpdate::getKey)
.flatMap(new TtlVerifyUpdateFunction(ttlConfig, ttlTimeProvider, config.reportStatAfterUpdatesNum))
.name("TtlVerifyUpdateFunction")
.addSink(new PrintSinkFunction<>())
.name("PrintFailedVerifications");
env.execute("State TTL test job");
}
代码示例来源:origin: apache/flink
/**
* Tests that a manual hash for an intermediate chain node is accepted.
*/
@Test
public void testManualHashAssignmentForIntermediateNodeInChain() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.setParallelism(4);
env.addSource(new NoOpSourceFunction())
// Intermediate chained node
.map(new NoOpMapFunction()).uid("map")
.addSink(new NoOpSinkFunction());
env.getStreamGraph().getJobGraph();
}
代码示例来源:origin: apache/flink
/**
* Runs the following program.
* <pre>
* [ (source)->(filter)] -> [ (map) -> (map) ] -> [ (groupBy/reduce)->(sink) ]
* </pre>
*/
@Override
public void testProgram(StreamExecutionEnvironment env) {
assertTrue("Broken test setup", NUM_STRINGS % 40 == 0);
final long failurePosMin = (long) (0.4 * NUM_STRINGS / PARALLELISM);
final long failurePosMax = (long) (0.7 * NUM_STRINGS / PARALLELISM);
final long failurePos = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin;
env.enableCheckpointing(200);
DataStream<String> stream = env.addSource(new StringGeneratingSourceFunction(NUM_STRINGS));
stream
// first vertex, chained to the source
// this filter throttles the flow until at least one checkpoint
// is complete, to make sure this program does not run without
.filter(new StringRichFilterFunction())
// -------------- seconds vertex - one-to-one connected ----------------
.map(new StringPrefixCountRichMapFunction())
.startNewChain()
.map(new StatefulCounterFunction())
// -------------- third vertex - reducer and the sink ----------------
.keyBy("prefix")
.flatMap(new OnceFailingAggregator(failurePos))
.addSink(new ValidatingSink());
}
代码示例来源:origin: apache/flink
@SuppressWarnings("rawtypes")
@Test
public void testSimpleIteration() throws Exception {
int numRetries = 5;
int timeoutScale = 1;
for (int numRetry = 0; numRetry < numRetries; numRetry++) {
try {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
iterated = new boolean[parallelism];
DataStream<Boolean> source = env.fromCollection(Collections.nCopies(parallelism * 2, false))
.map(noOpBoolMap).name("ParallelizeMap");
IterativeStream<Boolean> iteration = source.iterate(3000 * timeoutScale);
DataStream<Boolean> increment = iteration.flatMap(new IterationHead()).map(noOpBoolMap);
iteration.map(noOpBoolMap).addSink(new ReceiveCheckNoOpSink());
iteration.closeWith(increment).addSink(new ReceiveCheckNoOpSink());
env.execute();
for (boolean iter : iterated) {
assertTrue(iter);
}
break; // success
} catch (Throwable t) {
LOG.info("Run " + (numRetry + 1) + "/" + numRetries + " failed", t);
if (numRetry >= numRetries - 1) {
throw t;
} else {
timeoutScale *= 2;
}
}
}
}
代码示例来源:origin: apache/flink
/**
* Tests that a collision on the manual hash throws an Exception.
*/
@Test(expected = IllegalArgumentException.class)
public void testManualHashAssignmentCollisionThrowsException() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.setParallelism(4);
env.disableOperatorChaining();
env.addSource(new NoOpSourceFunction()).uid("source")
.map(new NoOpMapFunction()).uid("source") // Collision
.addSink(new NoOpSinkFunction());
// This call is necessary to generate the job graph
env.getStreamGraph().getJobGraph();
}
代码示例来源:origin: apache/flink
@Test
public void testProcessdWindowFunctionSideOutput() throws Exception {
TestListResultSink<Integer> resultSink = new TestListResultSink<>();
TestListResultSink<String> sideOutputResultSink = new TestListResultSink<>();
StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
see.setParallelism(3);
see.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<Integer> dataStream = see.fromCollection(elements);
OutputTag<String> sideOutputTag = new OutputTag<String>("side"){};
SingleOutputStreamOperator<Integer> windowOperator = dataStream
.assignTimestampsAndWatermarks(new TestWatermarkAssigner())
.keyBy(new TestKeySelector())
.timeWindow(Time.milliseconds(1), Time.milliseconds(1))
.process(new ProcessWindowFunction<Integer, Integer, Integer, TimeWindow>() {
private static final long serialVersionUID = 1L;
@Override
public void process(Integer integer, Context context, Iterable<Integer> elements, Collector<Integer> out) throws Exception {
out.collect(integer);
context.output(sideOutputTag, "sideout-" + String.valueOf(integer));
}
});
windowOperator.getSideOutput(sideOutputTag).addSink(sideOutputResultSink);
windowOperator.addSink(resultSink);
see.execute();
assertEquals(Arrays.asList("sideout-1", "sideout-2", "sideout-5"), sideOutputResultSink.getSortedResult());
assertEquals(Arrays.asList(1, 2, 5), resultSink.getSortedResult());
}
代码示例来源:origin: apache/flink
/**
* Runs the following program.
* <pre>
* [ (source)->(filter) ]-s->[ (map) ] -> [ (map) ] -> [ (groupBy/count)->(sink) ]
* </pre>
*/
@Override
public void testProgram(StreamExecutionEnvironment env) {
DataStream<String> stream = env.addSource(new StringGeneratingSourceFunction(NUM_STRINGS));
stream
// -------------- first vertex, chained to the source ----------------
.filter(new StringRichFilterFunction())
.shuffle()
// -------------- seconds vertex - the stateful one that also fails ----------------
.map(new StringPrefixCountRichMapFunction())
.startNewChain()
.map(new StatefulCounterFunction())
// -------------- third vertex - counter and the sink ----------------
.keyBy("prefix")
.map(new OnceFailingPrefixCounter(NUM_STRINGS))
.addSink(new SinkFunction<PrefixCount>() {
@Override
public void invoke(PrefixCount value) throws Exception {
// Do nothing here
}
});
}
代码示例来源:origin: apache/flink
@Override
public void testProgram(StreamExecutionEnvironment env) {
// set the restart strategy.
env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(NO_OF_RETRIES, 0));
env.enableCheckpointing(10);
// create and start the file creating thread.
fc = new FileCreator();
fc.start();
// create the monitoring source along with the necessary readers.
TextInputFormat format = new TextInputFormat(new org.apache.flink.core.fs.Path(localFsURI));
format.setFilesFilter(FilePathFilter.createDefaultFilter());
DataStream<String> inputStream = env.readFile(format, localFsURI,
FileProcessingMode.PROCESS_CONTINUOUSLY, INTERVAL);
TestingSinkFunction sink = new TestingSinkFunction();
inputStream.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
out.collect(value);
}
}).addSink(sink).setParallelism(1);
}
代码示例来源:origin: apache/flink
/**
* These check whether timestamps are properly ignored when they are disabled.
*/
@Test
public void testDisabledTimestamps() throws Exception {
final int numElements = 10;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
env.setParallelism(PARALLELISM);
env.getConfig().disableSysoutLogging();
DataStream<Integer> source1 = env.addSource(new MyNonWatermarkingSource(numElements));
DataStream<Integer> source2 = env.addSource(new MyNonWatermarkingSource(numElements));
source1
.map(new IdentityMap())
.connect(source2).map(new IdentityCoMap())
.transform("Custom Operator", BasicTypeInfo.INT_TYPE_INFO, new DisabledTimestampCheckingOperator())
.addSink(new DiscardingSink<Integer>());
env.execute();
}
代码示例来源:origin: apache/flink
/**
* These check whether timestamps are properly assigned at the sources and handled in
* network transmission and between chained operators when timestamps are enabled.
*/
@Test
public void testTimestampHandling() throws Exception {
final int numElements = 10;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(PARALLELISM);
env.getConfig().disableSysoutLogging();
DataStream<Integer> source1 = env.addSource(new MyTimestampSource(0L, numElements));
DataStream<Integer> source2 = env.addSource(new MyTimestampSource(0L, numElements));
source1
.map(new IdentityMap())
.connect(source2).map(new IdentityCoMap())
.transform("Custom Operator", BasicTypeInfo.INT_TYPE_INFO, new TimestampCheckingOperator())
.addSink(new DiscardingSink<Integer>());
env.execute();
}
代码示例来源:origin: apache/flink
@Test
public void testBoundsAreInclusiveByDefault() throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
DataStream<Tuple2<String, Integer>> streamOne = env.fromElements(
Tuple2.of("key", 0),
Tuple2.of("key", 1),
Tuple2.of("key", 2)
).assignTimestampsAndWatermarks(new AscendingTuple2TimestampExtractor());
DataStream<Tuple2<String, Integer>> streamTwo = env.fromElements(
Tuple2.of("key", 0),
Tuple2.of("key", 1),
Tuple2.of("key", 2)
).assignTimestampsAndWatermarks(new AscendingTuple2TimestampExtractor());
streamOne.keyBy(new Tuple2KeyExtractor())
.intervalJoin(streamTwo.keyBy(new Tuple2KeyExtractor()))
.between(Time.milliseconds(0), Time.milliseconds(2))
.process(new CombineToStringJoinFunction())
.addSink(new ResultSink());
env.execute();
expectInAnyOrder(
"(key,0):(key,0)",
"(key,0):(key,1)",
"(key,0):(key,2)",
"(key,1):(key,1)",
"(key,1):(key,2)",
"(key,2):(key,2)"
);
}
内容来源于网络,如有侵权,请联系作者删除!