org.apache.flink.streaming.api.datastream.KeyedStream.addSink()方法的使用及代码示例

x33g5p2x  于2022-01-23 转载在 其他  
字(3.3k)|赞(0)|评价(0)|浏览(135)

本文整理了Java中org.apache.flink.streaming.api.datastream.KeyedStream.addSink()方法的一些代码示例,展示了KeyedStream.addSink()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。KeyedStream.addSink()方法的具体详情如下:
包路径:org.apache.flink.streaming.api.datastream.KeyedStream
类名称:KeyedStream
方法名:addSink

KeyedStream.addSink介绍

暂无

代码示例

代码示例来源:origin: apache/flink

@Test
public void testPrimitiveKeyAcceptance() throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setParallelism(1);
  env.setMaxParallelism(1);
  DataStream<Integer> input = env.fromElements(new Integer(10000));
  KeyedStream<Integer, Object> keyedStream = input.keyBy(new KeySelector<Integer, Object>() {
    @Override
    public Object getKey(Integer value) throws Exception {
      return value;
    }
  });
  keyedStream.addSink(new SinkFunction<Integer>() {
    @Override
    public void invoke(Integer value) throws Exception {
      Assert.assertEquals(10000L, (long) value);
    }
  });
}

代码示例来源:origin: apache/flink

@Test
public void testPOJOWithNestedArrayAndHashCodeWorkAround() {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  DataStream<POJOWithHashCode> input = env.fromElements(
      new POJOWithHashCode(new int[] {1, 2}));
  input.keyBy(new KeySelector<POJOWithHashCode, POJOWithHashCode>() {
    @Override
    public POJOWithHashCode getKey(POJOWithHashCode value) throws Exception {
      return value;
    }
  }).addSink(new SinkFunction<POJOWithHashCode>() {
    @Override
    public void invoke(POJOWithHashCode value) throws Exception {
      Assert.assertEquals(value.getId(), new int[]{1, 2});
    }
  });
}

代码示例来源:origin: apache/flink

public static void main(final String[] args) throws Exception {
  final ParameterTool params = ParameterTool.fromArgs(args);
  final String outputPath = params.getRequired("outputPath");
  final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setParallelism(4);
  env.enableCheckpointing(5000L);
  env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10L, TimeUnit.SECONDS)));
  final StreamingFileSink<Tuple2<Integer, Integer>> sink = StreamingFileSink
    .forRowFormat(new Path(outputPath), (Encoder<Tuple2<Integer, Integer>>) (element, stream) -> {
      PrintStream out = new PrintStream(stream);
      out.println(element.f1);
    })
    .withBucketAssigner(new KeyBucketAssigner())
    .withRollingPolicy(OnCheckpointRollingPolicy.build())
    .build();
  // generate data, shuffle, sink
  env.addSource(new Generator(10, 10, 60))
    .keyBy(0)
    .addSink(sink);
  env.execute("StreamingFileSinkProgram");
}

代码示例来源:origin: apache/flink

.map(new OnceFailingPartitionedSum(failurePos))
.keyBy(0)
.addSink(new CounterSink());

代码示例来源:origin: apache/flink

.addSink(new MinEvictingQueueSink());
.addSink(new SumEvictingQueueSink());
.addSink(new FoldEvictingQueueSink());

代码示例来源:origin: apache/flink

private static void runPartitioningProgram(int parallelism) throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setParallelism(parallelism);
  env.getConfig().enableObjectReuse();
  env.setBufferTimeout(5L);
  env.enableCheckpointing(1000, CheckpointingMode.AT_LEAST_ONCE);
  env
    .addSource(new TimeStampingSource())
    .map(new IdMapper<Tuple2<Long, Long>>())
    .keyBy(0)
    .addSink(new TimestampingSink());
  env.execute("Partitioning Program");
}

代码示例来源:origin: apache/flink

.addSink(new ToPartitionFileSink(partitionFiles));

相关文章