org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator类的使用及代码示例

x33g5p2x  于2022-01-30 转载在 其他  
字(14.2k)|赞(0)|评价(0)|浏览(311)

本文整理了Java中org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator类的一些代码示例,展示了SingleOutputStreamOperator类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。SingleOutputStreamOperator类的具体详情如下:
包路径:org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
类名称:SingleOutputStreamOperator

SingleOutputStreamOperator介绍

[英]SingleOutputStreamOperator represents a user defined transformation applied on a DataStream with one predefined output type.
[中]SingleOutputStreamOperator表示应用于具有一种预定义输出类型的数据流的用户定义转换。

代码示例

代码示例来源:origin: apache/flink

.assignTimestampsAndWatermarks(createTimestampExtractor(pt))
.keyBy(Event::getKey)
.map(createArtificialKeyedStateMapper(
).returns(Event.class).name(KEYED_STATE_OPER_NAME + "_Kryo_and_Custom_Stateful").uid("0002");
).returns(Event.class).name(KEYED_STATE_OPER_NAME + "_Avro").uid("0003");
.returns(Event.class)
.name(OPERATOR_STATE_OPER_NAME).uid("0004");
}).name(TIME_WINDOW_OPER_NAME).uid("0005");
  .setParallelism(1)
  .name(FAILURE_MAPPER_NAME).uid("0006");
.name(SEMANTICS_CHECK_MAPPER_NAME)
.uid("007")
.addSink(new PrintSinkFunction<>())
.uid("008");
.name(SLIDING_WINDOW_AGG_NAME)
.uid("009");
.uid("010")
.name(SLIDING_WINDOW_CHECK_MAPPER_NAME)
.addSink(new PrintSinkFunction<>())
.uid("011");

代码示例来源:origin: apache/flink

final DataStream<L> timedOutStream = mainStream.getSideOutput(outputTag);
final TypeInformation<Either<L, R>> outTypeInfo = new EitherTypeInfo<>(timedOutTypeInfo, mainTypeInfo);
    .connect(timedOutStream)
    .map(new CoMapTimeout<>())
    .returns(outTypeInfo);

代码示例来源:origin: apache/flink

@Test
public void testUserProvidedHashingOnChainSupported() {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
  env.addSource(new NoOpSourceFunction(), "src").setUidHash("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")
      .map(new NoOpMapFunction()).setUidHash("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb")
      .filter(new NoOpFilterFunction()).setUidHash("cccccccccccccccccccccccccccccccc")
      .keyBy(new NoOpKeySelector())
      .reduce(new NoOpReduceFunction()).name("reduce").setUidHash("dddddddddddddddddddddddddddddddd");
  env.getStreamGraph().getJobGraph();
}

代码示例来源:origin: apache/flink

private static KeyedStream<Event, Integer> applyTestStatefulOperator(
  String name,
  JoinFunction<Event, ComplexPayload, ComplexPayload> stateFunc,
  KeyedStream<Event, Integer> source,
  List<TypeSerializer<ComplexPayload>> stateSer,
  List<Class<ComplexPayload>> stateClass) {
  return source
    .map(createArtificialKeyedStateMapper(e -> e, stateFunc, stateSer, stateClass))
    .name(name)
    .uid(name)
    .returns(Event.class)
    .keyBy(Event::getKey);
}

代码示例来源:origin: apache/flink

public static SingleOutputStreamOperator<Integer> createSecondStatefulMap(ExecutionMode mode, DataStream<Integer> input) {
  return input
    .map(new StatefulStringStoringMap(mode, "second"))
    .setParallelism(4)
    .uid("second");
}

代码示例来源:origin: apache/flink

private static void addSmallBoundedJob(StreamExecutionEnvironment env, int parallelism) {
    DataStream<Long> stream = env.generateSequence(1, 100).setParallelism(parallelism);

    stream
        .filter(ignored -> false).setParallelism(parallelism)
          .startNewChain()
          .print().setParallelism(parallelism);
  }
}

代码示例来源:origin: king/bravo

public DataStream<String> constructTestPipeline(DataStream<String> source) {
  return source
      .map(Integer::parseInt)
      .returns(Integer.class)
      .keyBy(i -> i)
      .map(new StatefulCounter())
      .uid("hello")
      .map(Tuple2::toString)
      .returns(String.class);
}

代码示例来源:origin: apache/flink

public static void main(String[] args) throws Exception {
  final ParameterTool pt = ParameterTool.fromArgs(args);
  final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  setupEnvironment(env, pt);
  KeyedStream<Event, Integer> source = env.addSource(createEventSource(pt))
    .name("EventSource")
    .uid("EventSource")
    .assignTimestampsAndWatermarks(createTimestampExtractor(pt))
    .keyBy(Event::getKey);
  List<TypeSerializer<ComplexPayload>> stateSer =
    Collections.singletonList(new KryoSerializer<>(ComplexPayload.class, env.getConfig()));
  KeyedStream<Event, Integer> afterStatefulOperations = isOriginalJobVariant(pt) ?
    applyOriginalStatefulOperations(source, stateSer, Collections.emptyList()) :
    applyUpgradedStatefulOperations(source, stateSer, Collections.emptyList());
  afterStatefulOperations
    .flatMap(createSemanticsCheckMapper(pt))
    .name("SemanticsCheckMapper")
    .addSink(new PrintSinkFunction<>());
  env.execute("General purpose test job");
}

代码示例来源:origin: apache/flink

@Test
public void testSideOutputWithMultipleConsumers() throws Exception {
  final OutputTag<String> sideOutputTag = new OutputTag<String>("side"){};
  TestListResultSink<String> sideOutputResultSink1 = new TestListResultSink<>();
  TestListResultSink<String> sideOutputResultSink2 = new TestListResultSink<>();
  TestListResultSink<Integer> resultSink = new TestListResultSink<>();
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setParallelism(3);
  DataStream<Integer> dataStream = env.fromCollection(elements);
  SingleOutputStreamOperator<Integer> passThroughtStream = dataStream
      .process(new ProcessFunction<Integer, Integer>() {
        private static final long serialVersionUID = 1L;
        @Override
        public void processElement(
            Integer value, Context ctx, Collector<Integer> out) throws Exception {
          out.collect(value);
          ctx.output(sideOutputTag, "sideout-" + String.valueOf(value));
        }
      });
  passThroughtStream.getSideOutput(sideOutputTag).addSink(sideOutputResultSink1);
  passThroughtStream.getSideOutput(sideOutputTag).addSink(sideOutputResultSink2);
  passThroughtStream.addSink(resultSink);
  env.execute();
  assertEquals(Arrays.asList("sideout-1", "sideout-2", "sideout-3", "sideout-4", "sideout-5"), sideOutputResultSink1.getSortedResult());
  assertEquals(Arrays.asList("sideout-1", "sideout-2", "sideout-3", "sideout-4", "sideout-5"), sideOutputResultSink2.getSortedResult());
  assertEquals(Arrays.asList(1, 2, 3, 4, 5), resultSink.getSortedResult());
}

代码示例来源:origin: apache/flink

.addSource(new LegacyCheckpointedSource(NUM_SOURCE_ELEMENTS)).setMaxParallelism(1).uid("LegacyCheckpointedSource")
.flatMap(new LegacyCheckpointedFlatMap()).startNewChain().uid("LegacyCheckpointedFlatMap")
.keyBy(0)
.flatMap(new LegacyCheckpointedFlatMapWithKeyedState()).startNewChain().uid("LegacyCheckpointedFlatMapWithKeyedState")
.keyBy(0)
.flatMap(new KeyedStateSettingFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap")
.keyBy(0)
.transform(
  "custom_operator",
  new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(),
  new CheckpointedUdfOperator(new LegacyCheckpointedFlatMapWithKeyedState())).uid("LegacyCheckpointedOperator")
.keyBy(0)
.transform(
  "timely_stateful_operator",
  new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(),
  new TimelyStatefulOperator()).uid("TimelyStatefulOperator")
.addSink(new AccumulatorCountingSink<Tuple2<Long, Long>>());

代码示例来源:origin: apache/flink

/**
 * Runs the following program.
 * <pre>
 *     [ (source)->(filter) ]-s->[ (map) ] -> [ (map) ] -> [ (groupBy/count)->(sink) ]
 * </pre>
 */
@Override
public void testProgram(StreamExecutionEnvironment env) {
  DataStream<String> stream = env.addSource(new StringGeneratingSourceFunction(NUM_STRINGS));
  stream
      // -------------- first vertex, chained to the source ----------------
      .filter(new StringRichFilterFunction())
      .shuffle()
      // -------------- seconds vertex - the stateful one that also fails ----------------
      .map(new StringPrefixCountRichMapFunction())
      .startNewChain()
      .map(new StatefulCounterFunction())
      // -------------- third vertex - counter and the sink ----------------
      .keyBy("prefix")
      .map(new OnceFailingPrefixCounter(NUM_STRINGS))
      .addSink(new SinkFunction<PrefixCount>() {
        @Override
        public void invoke(PrefixCount value) throws Exception {
          // Do nothing here
        }
      });
}

代码示例来源:origin: apache/flink

@Test
public void testProcessdWindowFunctionSideOutput() throws Exception {
  TestListResultSink<Integer> resultSink = new TestListResultSink<>();
  TestListResultSink<String> sideOutputResultSink = new TestListResultSink<>();
  StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
  see.setParallelism(3);
  see.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
  DataStream<Integer> dataStream = see.fromCollection(elements);
  OutputTag<String> sideOutputTag = new OutputTag<String>("side"){};
  SingleOutputStreamOperator<Integer> windowOperator = dataStream
      .assignTimestampsAndWatermarks(new TestWatermarkAssigner())
      .keyBy(new TestKeySelector())
      .timeWindow(Time.milliseconds(1), Time.milliseconds(1))
      .process(new ProcessWindowFunction<Integer, Integer, Integer, TimeWindow>() {
        private static final long serialVersionUID = 1L;
        @Override
        public void process(Integer integer, Context context, Iterable<Integer> elements, Collector<Integer> out) throws Exception {
          out.collect(integer);
          context.output(sideOutputTag, "sideout-" + String.valueOf(integer));
        }
      });
  windowOperator.getSideOutput(sideOutputTag).addSink(sideOutputResultSink);
  windowOperator.addSink(resultSink);
  see.execute();
  assertEquals(Arrays.asList("sideout-1", "sideout-2", "sideout-5"), sideOutputResultSink.getSortedResult());
  assertEquals(Arrays.asList(1, 2, 5), resultSink.getSortedResult());
}

代码示例来源:origin: apache/flink

/**
 * Tests that a manual hash at the beginning of a chain is accepted.
 */
@Test
public void testManualHashAssignmentForStartNodeInInChain() throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
  env.setParallelism(4);
  env.addSource(new NoOpSourceFunction()).uid("source")
      .map(new NoOpMapFunction())
      .addSink(new NoOpSinkFunction());
  env.getStreamGraph().getJobGraph();
}

代码示例来源:origin: apache/flink

.map(noOpIntMap).name("ParallelizeMapShuffle");
DataStream<Integer> source2 = env.fromElements(1, 2, 3, 4, 5)
    .map(noOpIntMap).name("ParallelizeMapRebalance");
DataStream<Integer> head1 = iter1.map(noOpIntMap).name("IterRebalanceMap").setParallelism(parallelism / 2);
DataStream<Integer> head2 = iter1.map(noOpIntMap).name("IterForwardMap");
DataStreamSink<Integer> head3 = iter1.map(noOpIntMap).setParallelism(parallelism / 2).addSink(new ReceiveCheckNoOpSink<Integer>());
DataStreamSink<Integer> head4 = iter1.map(noOpIntMap).addSink(new ReceiveCheckNoOpSink<Integer>());
    .map(noOpIntMap).name("EvenOddSourceMap")
    .split(new EvenOddOutputSelector());
    head1.rebalance().map(noOpIntMap).broadcast(), head2.shuffle()));

代码示例来源:origin: apache/flink

public static void main(String[] args) throws Exception {
  final ParameterTool pt = ParameterTool.fromArgs(args);
  final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  setupEnvironment(env, pt);
  final MonotonicTTLTimeProvider ttlTimeProvider = setBackendWithCustomTTLTimeProvider(env);
  TtlTestConfig config = TtlTestConfig.fromArgs(pt);
  StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(config.ttl)
    .cleanupIncrementally(5, true)
    .cleanupFullSnapshot()
    .build();
  env
    .addSource(new TtlStateUpdateSource(config.keySpace, config.sleepAfterElements, config.sleepTime))
    .name("TtlStateUpdateSource")
    .keyBy(TtlStateUpdate::getKey)
    .flatMap(new TtlVerifyUpdateFunction(ttlConfig, ttlTimeProvider, config.reportStatAfterUpdatesNum))
    .name("TtlVerifyUpdateFunction")
    .addSink(new PrintSinkFunction<>())
    .name("PrintFailedVerifications");
  env.execute("State TTL test job");
}

代码示例来源:origin: apache/flink

return null;
}).name("MyMap");
    .windowAll(GlobalWindows.create())
    .trigger(PurgingTrigger.of(CountTrigger.of(10)))
    .fold(0L, new FoldFunction<Long, Long>() {
DataStreamSink<Long> sink = map.addSink(new SinkFunction<Long>() {
  private static final long serialVersionUID = 1L;
assertEquals(10, env.getStreamGraph().getStreamNode(map.getId()).getParallelism());
assertEquals(1, env.getStreamGraph().getStreamNode(windowed.getId()).getParallelism());
assertEquals(10,
assertEquals(10, env.getStreamGraph().getStreamNode(map.getId()).getParallelism());
assertEquals(1, env.getStreamGraph().getStreamNode(windowed.getId()).getParallelism());
assertEquals(10, env.getStreamGraph().getStreamNode(sink.getTransformation().getId()).getParallelism());
assertEquals(3, env.getStreamGraph().getStreamNode(parallelSource.getId()).getParallelism());
map.setParallelism(2);
assertEquals(2, env.getStreamGraph().getStreamNode(map.getId()).getParallelism());

代码示例来源:origin: apache/flink

.timeWindowAll(Time.milliseconds(1), Time.milliseconds(1))
    .process(new ProcessAllWindowFunction<Integer, Integer, TimeWindow>() {
      private static final long serialVersionUID = 1L;
windowOperator.getSideOutput(sideOutputTag).addSink(sideOutputResultSink);
windowOperator.addSink(resultSink);
see.execute();

代码示例来源:origin: apache/flink

timeout,
    TimeUnit.MILLISECONDS,
    20).setParallelism(taskNum);
    timeout,
    TimeUnit.MILLISECONDS,
    20).setParallelism(taskNum);
    out.collect(new Tuple2<>(value, 1));
}).keyBy(0).sum(1).print();

代码示例来源:origin: king/bravo

public DataStream<String> constructTestPipeline(DataStream<String> source) {
  return source
      .map(s -> {
        String[] split = s.split(",");
        return Tuple2.of(Integer.parseInt(split[0]), Integer.parseInt(split[1]));
      })
      .returns(new TypeHint<Tuple2<Integer, Integer>>() {})
      .keyBy(0)
      .map(new MapCounter())
      .uid("hello");
}

代码示例来源:origin: apache/flink

public static void main(String[] args) throws Exception {
    final ParameterTool pt = ParameterTool.fromArgs(args);
    final String checkpointDir = pt.getRequired("checkpoint.dir");

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setStateBackend(new FsStateBackend(checkpointDir));
    env.setRestartStrategy(RestartStrategies.noRestart());
    env.enableCheckpointing(1000L);
    env.getConfig().disableGenericTypes();

    env.addSource(new MySource()).uid("my-source")
        .keyBy(anInt -> 0)
        .map(new MyStatefulFunction()).uid("my-map")
        .addSink(new DiscardingSink<>()).uid("my-sink");
    env.execute();
  }
}

相关文章

微信公众号

最新文章

更多