本文整理了Java中org.apache.flink.streaming.api.datastream.KeyedStream.addSink()
方法的一些代码示例,展示了KeyedStream.addSink()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。KeyedStream.addSink()
方法的具体详情如下:
包路径:org.apache.flink.streaming.api.datastream.KeyedStream
类名称:KeyedStream
方法名:addSink
暂无
代码示例来源:origin: apache/flink
@Test
public void testPrimitiveKeyAcceptance() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setMaxParallelism(1);
DataStream<Integer> input = env.fromElements(new Integer(10000));
KeyedStream<Integer, Object> keyedStream = input.keyBy(new KeySelector<Integer, Object>() {
@Override
public Object getKey(Integer value) throws Exception {
return value;
}
});
keyedStream.addSink(new SinkFunction<Integer>() {
@Override
public void invoke(Integer value) throws Exception {
Assert.assertEquals(10000L, (long) value);
}
});
}
代码示例来源:origin: apache/flink
@Test
public void testPOJOWithNestedArrayAndHashCodeWorkAround() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<POJOWithHashCode> input = env.fromElements(
new POJOWithHashCode(new int[] {1, 2}));
input.keyBy(new KeySelector<POJOWithHashCode, POJOWithHashCode>() {
@Override
public POJOWithHashCode getKey(POJOWithHashCode value) throws Exception {
return value;
}
}).addSink(new SinkFunction<POJOWithHashCode>() {
@Override
public void invoke(POJOWithHashCode value) throws Exception {
Assert.assertEquals(value.getId(), new int[]{1, 2});
}
});
}
代码示例来源:origin: apache/flink
public static void main(final String[] args) throws Exception {
final ParameterTool params = ParameterTool.fromArgs(args);
final String outputPath = params.getRequired("outputPath");
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
env.enableCheckpointing(5000L);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10L, TimeUnit.SECONDS)));
final StreamingFileSink<Tuple2<Integer, Integer>> sink = StreamingFileSink
.forRowFormat(new Path(outputPath), (Encoder<Tuple2<Integer, Integer>>) (element, stream) -> {
PrintStream out = new PrintStream(stream);
out.println(element.f1);
})
.withBucketAssigner(new KeyBucketAssigner())
.withRollingPolicy(OnCheckpointRollingPolicy.build())
.build();
// generate data, shuffle, sink
env.addSource(new Generator(10, 10, 60))
.keyBy(0)
.addSink(sink);
env.execute("StreamingFileSinkProgram");
}
代码示例来源:origin: apache/flink
.map(new OnceFailingPartitionedSum(failurePos))
.keyBy(0)
.addSink(new CounterSink());
代码示例来源:origin: apache/flink
.addSink(new MinEvictingQueueSink());
.addSink(new SumEvictingQueueSink());
.addSink(new FoldEvictingQueueSink());
代码示例来源:origin: apache/flink
private static void runPartitioningProgram(int parallelism) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(parallelism);
env.getConfig().enableObjectReuse();
env.setBufferTimeout(5L);
env.enableCheckpointing(1000, CheckpointingMode.AT_LEAST_ONCE);
env
.addSource(new TimeStampingSource())
.map(new IdMapper<Tuple2<Long, Long>>())
.keyBy(0)
.addSink(new TimestampingSink());
env.execute("Partitioning Program");
}
代码示例来源:origin: apache/flink
.addSink(new ToPartitionFileSink(partitionFiles));
内容来源于网络,如有侵权,请联系作者删除!