org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator.addSink()方法的使用及代码示例

x33g5p2x  于2022-01-30 转载在 其他  
字(17.0k)|赞(0)|评价(0)|浏览(215)

本文整理了Java中org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator.addSink()方法的一些代码示例,展示了SingleOutputStreamOperator.addSink()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。SingleOutputStreamOperator.addSink()方法的具体详情如下:
包路径:org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
类名称:SingleOutputStreamOperator
方法名:addSink

SingleOutputStreamOperator.addSink介绍

暂无

代码示例

代码示例来源:origin: apache/flink

@SuppressWarnings("rawtypes,unchecked")
private static Integer createDownStreamId(ConnectedStreams dataStream) {
  SingleOutputStreamOperator<?> coMap = dataStream.map(new CoMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Object>() {
    private static final long serialVersionUID = 1L;
    @Override
    public Object map1(Tuple2<Long, Long> value) {
      return null;
    }
    @Override
    public Object map2(Tuple2<Long, Long> value) {
      return null;
    }
  });
  coMap.addSink(new DiscardingSink());
  return coMap.getId();
}

代码示例来源:origin: apache/flink

public static void main(String[] args) throws Exception {
  final ParameterTool pt = ParameterTool.fromArgs(args);
  final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  setupEnvironment(env, pt);
  KeyedStream<Event, Integer> source = env.addSource(createEventSource(pt))
    .name("EventSource")
    .uid("EventSource")
    .assignTimestampsAndWatermarks(createTimestampExtractor(pt))
    .keyBy(Event::getKey);
  List<TypeSerializer<ComplexPayload>> stateSer =
    Collections.singletonList(new KryoSerializer<>(ComplexPayload.class, env.getConfig()));
  KeyedStream<Event, Integer> afterStatefulOperations = isOriginalJobVariant(pt) ?
    applyOriginalStatefulOperations(source, stateSer, Collections.emptyList()) :
    applyUpgradedStatefulOperations(source, stateSer, Collections.emptyList());
  afterStatefulOperations
    .flatMap(createSemanticsCheckMapper(pt))
    .name("SemanticsCheckMapper")
    .addSink(new PrintSinkFunction<>());
  env.execute("General purpose test job");
}

代码示例来源:origin: apache/flink

public static void main(String[] args) throws Exception {

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(4);

    DataStream<Tuple2<Long, Long>> stream = env.addSource(new DataSource());

    stream
      .keyBy(0)
      .timeWindow(Time.of(2500, MILLISECONDS), Time.of(500, MILLISECONDS))
      .reduce(new SummingReducer())

      // alternative: use a apply function which does not pre-aggregate
//            .keyBy(new FirstFieldKeyExtractor<Tuple2<Long, Long>, Long>())
//            .window(Time.of(2500, MILLISECONDS), Time.of(500, MILLISECONDS))
//            .apply(new SummingWindowFunction())

      .addSink(new SinkFunction<Tuple2<Long, Long>>() {
        @Override
        public void invoke(Tuple2<Long, Long> value) {
        }
      });

    env.execute();
  }

代码示例来源:origin: apache/flink

/**
 * Tests that there are no collisions with two identical intermediate nodes connected to the
 * same predecessor.
 *
 * <pre>
 *             /-> [ (map) ] -> [ (sink) ]
 * [ (src) ] -+
 *             \-> [ (map) ] -> [ (sink) ]
 * </pre>
 */
@Test
public void testNodeHashIdenticalNodes() throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
  env.setParallelism(4);
  env.disableOperatorChaining();
  DataStream<String> src = env.addSource(new NoOpSourceFunction());
  src.map(new NoOpMapFunction()).addSink(new NoOpSinkFunction());
  src.map(new NoOpMapFunction()).addSink(new NoOpSinkFunction());
  JobGraph jobGraph = env.getStreamGraph().getJobGraph();
  Set<JobVertexID> vertexIds = new HashSet<>();
  for (JobVertex vertex : jobGraph.getVertices()) {
    assertTrue(vertexIds.add(vertex.getID()));
  }
}

代码示例来源:origin: apache/flink

@Test
public void test() throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setParallelism(1);
  env.addSource(new TestSource()).map(new TestMap()).addSink(new DiscardingSink<Integer>());
  env.execute();
  assertNotEquals(srcContext, mapContext);
}

代码示例来源:origin: apache/flink

public static void main(String[] args) throws Exception {
  ParameterTool params = ParameterTool.fromArgs(args);
  String outputPath = params.getRequired("outputPath");
  StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
  sEnv.setRestartStrategy(RestartStrategies.fixedDelayRestart(
      3,
      Time.of(10, TimeUnit.SECONDS)
    ));
  sEnv.enableCheckpointing(4000);
  final int idlenessMs = 10;
  // define bucketing sink to emit the result
  BucketingSink<Tuple4<Integer, Long, Integer, String>> sink = new BucketingSink<Tuple4<Integer, Long, Integer, String>>(outputPath)
    .setBucketer(new KeyBucketer());
  // generate data, shuffle, perform stateful operation, sink
  sEnv.addSource(new Generator(10, idlenessMs, 60))
    .keyBy(0)
    .map(new SubtractingMapper(-1L * idlenessMs))
    .addSink(sink);
  sEnv.execute();
}

代码示例来源:origin: apache/flink

/**
 * Test ProcessFunction side output.
 */
@Test
public void testProcessFunctionSideOutput() throws Exception {
  final OutputTag<String> sideOutputTag = new OutputTag<String>("side"){};
  TestListResultSink<String> sideOutputResultSink = new TestListResultSink<>();
  TestListResultSink<Integer> resultSink = new TestListResultSink<>();
  StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
  see.setParallelism(3);
  DataStream<Integer> dataStream = see.fromCollection(elements);
  SingleOutputStreamOperator<Integer> passThroughtStream = dataStream
      .process(new ProcessFunction<Integer, Integer>() {
        private static final long serialVersionUID = 1L;
        @Override
        public void processElement(
            Integer value, Context ctx, Collector<Integer> out) throws Exception {
          out.collect(value);
          ctx.output(sideOutputTag, "sideout-" + String.valueOf(value));
        }
      });
  passThroughtStream.getSideOutput(sideOutputTag).addSink(sideOutputResultSink);
  passThroughtStream.addSink(resultSink);
  see.execute();
  assertEquals(Arrays.asList("sideout-1", "sideout-2", "sideout-3", "sideout-4", "sideout-5"), sideOutputResultSink.getSortedResult());
  assertEquals(Arrays.asList(1, 2, 3, 4, 5), resultSink.getSortedResult());
}

代码示例来源:origin: apache/flink

/**
 * Tests that a manual hash at the beginning of a chain is accepted.
 */
@Test
public void testManualHashAssignmentForStartNodeInInChain() throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
  env.setParallelism(4);
  env.addSource(new NoOpSourceFunction()).uid("source")
      .map(new NoOpMapFunction())
      .addSink(new NoOpSinkFunction());
  env.getStreamGraph().getJobGraph();
}

代码示例来源:origin: apache/flink

public static void main(String[] args) throws Exception {
    final ParameterTool pt = ParameterTool.fromArgs(args);
    final String checkpointDir = pt.getRequired("checkpoint.dir");

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setStateBackend(new FsStateBackend(checkpointDir));
    env.setRestartStrategy(RestartStrategies.noRestart());
    env.enableCheckpointing(1000L);
    env.getConfig().disableGenericTypes();

    env.addSource(new MySource()).uid("my-source")
        .keyBy(anInt -> 0)
        .map(new MyStatefulFunction()).uid("my-map")
        .addSink(new DiscardingSink<>()).uid("my-sink");
    env.execute();
  }
}

代码示例来源:origin: apache/flink

public static void main(String[] args) throws Exception {
  final ParameterTool pt = ParameterTool.fromArgs(args);
  final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  setupEnvironment(env, pt);
  final MonotonicTTLTimeProvider ttlTimeProvider = setBackendWithCustomTTLTimeProvider(env);
  TtlTestConfig config = TtlTestConfig.fromArgs(pt);
  StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(config.ttl)
    .cleanupIncrementally(5, true)
    .cleanupFullSnapshot()
    .build();
  env
    .addSource(new TtlStateUpdateSource(config.keySpace, config.sleepAfterElements, config.sleepTime))
    .name("TtlStateUpdateSource")
    .keyBy(TtlStateUpdate::getKey)
    .flatMap(new TtlVerifyUpdateFunction(ttlConfig, ttlTimeProvider, config.reportStatAfterUpdatesNum))
    .name("TtlVerifyUpdateFunction")
    .addSink(new PrintSinkFunction<>())
    .name("PrintFailedVerifications");
  env.execute("State TTL test job");
}

代码示例来源:origin: apache/flink

/**
 * Tests that a manual hash for an intermediate chain node is accepted.
 */
@Test
public void testManualHashAssignmentForIntermediateNodeInChain() throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
  env.setParallelism(4);
  env.addSource(new NoOpSourceFunction())
      // Intermediate chained node
      .map(new NoOpMapFunction()).uid("map")
      .addSink(new NoOpSinkFunction());
  env.getStreamGraph().getJobGraph();
}

代码示例来源:origin: apache/flink

/**
 * Runs the following program.
 * <pre>
 *     [ (source)->(filter)] -> [ (map) -> (map) ] -> [ (groupBy/reduce)->(sink) ]
 * </pre>
 */
@Override
public void testProgram(StreamExecutionEnvironment env) {
  assertTrue("Broken test setup", NUM_STRINGS % 40 == 0);
  final long failurePosMin = (long) (0.4 * NUM_STRINGS / PARALLELISM);
  final long failurePosMax = (long) (0.7 * NUM_STRINGS / PARALLELISM);
  final long failurePos = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin;
  env.enableCheckpointing(200);
  DataStream<String> stream = env.addSource(new StringGeneratingSourceFunction(NUM_STRINGS));
  stream
      // first vertex, chained to the source
      // this filter throttles the flow until at least one checkpoint
      // is complete, to make sure this program does not run without
      .filter(new StringRichFilterFunction())
          // -------------- seconds vertex - one-to-one connected ----------------
      .map(new StringPrefixCountRichMapFunction())
      .startNewChain()
      .map(new StatefulCounterFunction())
          // -------------- third vertex - reducer and the sink ----------------
      .keyBy("prefix")
      .flatMap(new OnceFailingAggregator(failurePos))
      .addSink(new ValidatingSink());
}

代码示例来源:origin: apache/flink

@SuppressWarnings("rawtypes")
@Test
public void testSimpleIteration() throws Exception {
  int numRetries = 5;
  int timeoutScale = 1;
  for (int numRetry = 0; numRetry < numRetries; numRetry++) {
    try {
      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      iterated = new boolean[parallelism];
      DataStream<Boolean> source = env.fromCollection(Collections.nCopies(parallelism * 2, false))
          .map(noOpBoolMap).name("ParallelizeMap");
      IterativeStream<Boolean> iteration = source.iterate(3000 * timeoutScale);
      DataStream<Boolean> increment = iteration.flatMap(new IterationHead()).map(noOpBoolMap);
      iteration.map(noOpBoolMap).addSink(new ReceiveCheckNoOpSink());
      iteration.closeWith(increment).addSink(new ReceiveCheckNoOpSink());
      env.execute();
      for (boolean iter : iterated) {
        assertTrue(iter);
      }
      break; // success
    } catch (Throwable t) {
      LOG.info("Run " + (numRetry + 1) + "/" + numRetries + " failed", t);
      if (numRetry >= numRetries - 1) {
        throw t;
      } else {
        timeoutScale *= 2;
      }
    }
  }
}

代码示例来源:origin: apache/flink

/**
 * Tests that a collision on the manual hash throws an Exception.
 */
@Test(expected = IllegalArgumentException.class)
public void testManualHashAssignmentCollisionThrowsException() throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
  env.setParallelism(4);
  env.disableOperatorChaining();
  env.addSource(new NoOpSourceFunction()).uid("source")
      .map(new NoOpMapFunction()).uid("source") // Collision
      .addSink(new NoOpSinkFunction());
  // This call is necessary to generate the job graph
  env.getStreamGraph().getJobGraph();
}

代码示例来源:origin: apache/flink

@Test
public void testProcessdWindowFunctionSideOutput() throws Exception {
  TestListResultSink<Integer> resultSink = new TestListResultSink<>();
  TestListResultSink<String> sideOutputResultSink = new TestListResultSink<>();
  StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
  see.setParallelism(3);
  see.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
  DataStream<Integer> dataStream = see.fromCollection(elements);
  OutputTag<String> sideOutputTag = new OutputTag<String>("side"){};
  SingleOutputStreamOperator<Integer> windowOperator = dataStream
      .assignTimestampsAndWatermarks(new TestWatermarkAssigner())
      .keyBy(new TestKeySelector())
      .timeWindow(Time.milliseconds(1), Time.milliseconds(1))
      .process(new ProcessWindowFunction<Integer, Integer, Integer, TimeWindow>() {
        private static final long serialVersionUID = 1L;
        @Override
        public void process(Integer integer, Context context, Iterable<Integer> elements, Collector<Integer> out) throws Exception {
          out.collect(integer);
          context.output(sideOutputTag, "sideout-" + String.valueOf(integer));
        }
      });
  windowOperator.getSideOutput(sideOutputTag).addSink(sideOutputResultSink);
  windowOperator.addSink(resultSink);
  see.execute();
  assertEquals(Arrays.asList("sideout-1", "sideout-2", "sideout-5"), sideOutputResultSink.getSortedResult());
  assertEquals(Arrays.asList(1, 2, 5), resultSink.getSortedResult());
}

代码示例来源:origin: apache/flink

/**
 * Runs the following program.
 * <pre>
 *     [ (source)->(filter) ]-s->[ (map) ] -> [ (map) ] -> [ (groupBy/count)->(sink) ]
 * </pre>
 */
@Override
public void testProgram(StreamExecutionEnvironment env) {
  DataStream<String> stream = env.addSource(new StringGeneratingSourceFunction(NUM_STRINGS));
  stream
      // -------------- first vertex, chained to the source ----------------
      .filter(new StringRichFilterFunction())
      .shuffle()
      // -------------- seconds vertex - the stateful one that also fails ----------------
      .map(new StringPrefixCountRichMapFunction())
      .startNewChain()
      .map(new StatefulCounterFunction())
      // -------------- third vertex - counter and the sink ----------------
      .keyBy("prefix")
      .map(new OnceFailingPrefixCounter(NUM_STRINGS))
      .addSink(new SinkFunction<PrefixCount>() {
        @Override
        public void invoke(PrefixCount value) throws Exception {
          // Do nothing here
        }
      });
}

代码示例来源:origin: apache/flink

@Override
public void testProgram(StreamExecutionEnvironment env) {
  // set the restart strategy.
  env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(NO_OF_RETRIES, 0));
  env.enableCheckpointing(10);
  // create and start the file creating thread.
  fc = new FileCreator();
  fc.start();
  // create the monitoring source along with the necessary readers.
  TextInputFormat format = new TextInputFormat(new org.apache.flink.core.fs.Path(localFsURI));
  format.setFilesFilter(FilePathFilter.createDefaultFilter());
  DataStream<String> inputStream = env.readFile(format, localFsURI,
    FileProcessingMode.PROCESS_CONTINUOUSLY, INTERVAL);
  TestingSinkFunction sink = new TestingSinkFunction();
  inputStream.flatMap(new FlatMapFunction<String, String>() {
    @Override
    public void flatMap(String value, Collector<String> out) throws Exception {
      out.collect(value);
    }
  }).addSink(sink).setParallelism(1);
}

代码示例来源:origin: apache/flink

/**
 * These check whether timestamps are properly ignored when they are disabled.
 */
@Test
public void testDisabledTimestamps() throws Exception {
  final int numElements = 10;
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
  env.setParallelism(PARALLELISM);
  env.getConfig().disableSysoutLogging();
  DataStream<Integer> source1 = env.addSource(new MyNonWatermarkingSource(numElements));
  DataStream<Integer> source2 = env.addSource(new MyNonWatermarkingSource(numElements));
  source1
      .map(new IdentityMap())
      .connect(source2).map(new IdentityCoMap())
      .transform("Custom Operator", BasicTypeInfo.INT_TYPE_INFO, new DisabledTimestampCheckingOperator())
      .addSink(new DiscardingSink<Integer>());
  env.execute();
}

代码示例来源:origin: apache/flink

/**
 * These check whether timestamps are properly assigned at the sources and handled in
 * network transmission and between chained operators when timestamps are enabled.
 */
@Test
public void testTimestampHandling() throws Exception {
  final int numElements = 10;
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
  env.setParallelism(PARALLELISM);
  env.getConfig().disableSysoutLogging();
  DataStream<Integer> source1 = env.addSource(new MyTimestampSource(0L, numElements));
  DataStream<Integer> source2 = env.addSource(new MyTimestampSource(0L, numElements));
  source1
      .map(new IdentityMap())
      .connect(source2).map(new IdentityCoMap())
      .transform("Custom Operator", BasicTypeInfo.INT_TYPE_INFO, new TimestampCheckingOperator())
      .addSink(new DiscardingSink<Integer>());
  env.execute();
}

代码示例来源:origin: apache/flink

@Test
public void testBoundsAreInclusiveByDefault() throws Exception {
  final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
  env.setParallelism(1);
  DataStream<Tuple2<String, Integer>> streamOne = env.fromElements(
    Tuple2.of("key", 0),
    Tuple2.of("key", 1),
    Tuple2.of("key", 2)
  ).assignTimestampsAndWatermarks(new AscendingTuple2TimestampExtractor());
  DataStream<Tuple2<String, Integer>> streamTwo = env.fromElements(
    Tuple2.of("key", 0),
    Tuple2.of("key", 1),
    Tuple2.of("key", 2)
  ).assignTimestampsAndWatermarks(new AscendingTuple2TimestampExtractor());
  streamOne.keyBy(new Tuple2KeyExtractor())
    .intervalJoin(streamTwo.keyBy(new Tuple2KeyExtractor()))
    .between(Time.milliseconds(0), Time.milliseconds(2))
    .process(new CombineToStringJoinFunction())
    .addSink(new ResultSink());
  env.execute();
  expectInAnyOrder(
    "(key,0):(key,0)",
    "(key,0):(key,1)",
    "(key,0):(key,2)",
    "(key,1):(key,1)",
    "(key,1):(key,2)",
    "(key,2):(key,2)"
  );
}

相关文章

微信公众号

最新文章

更多