org.apache.flink.streaming.api.datastream.KeyedStream.process()方法的使用及代码示例

x33g5p2x  于2022-01-23 转载在 其他  
字(11.7k)|赞(0)|评价(0)|浏览(335)

本文整理了Java中org.apache.flink.streaming.api.datastream.KeyedStream.process()方法的一些代码示例,展示了KeyedStream.process()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。KeyedStream.process()方法的具体详情如下:
包路径:org.apache.flink.streaming.api.datastream.KeyedStream
类名称:KeyedStream
方法名:process

KeyedStream.process介绍

[英]Applies the given KeyedProcessFunction on the input stream, thereby creating a transformed output stream.

The function will be called for every element in the input streams and can produce zero or more output elements. Contrary to the DataStream#flatMap(FlatMapFunction)function, this function can also query the time and set timers. When reacting to the firing of set timers the function can directly emit elements and/or register yet more timers.
[中]将给定的KeyedProcessFunction应用于输入流,从而创建转换后的输出流。
该函数将为输入流中的每个元素调用,并且可以生成零个或多个输出元素。与DataStream#flatMap(FlatMapFunction)函数相反,此函数还可以查询时间和设置计时器。当对设置计时器的触发作出反应时,该函数可以直接发出元素和/或注册更多计时器。

代码示例

代码示例来源:origin: apache/flink

true);
return process(processFunction, outType);

代码示例来源:origin: apache/flink

/**
 * Applies the given {@link KeyedProcessFunction} on the input stream, thereby creating a transformed output stream.
 *
 * <p>The function will be called for every element in the input streams and can produce zero
 * or more output elements. Contrary to the {@link DataStream#flatMap(FlatMapFunction)}
 * function, this function can also query the time and set timers. When reacting to the firing
 * of set timers the function can directly emit elements and/or register yet more timers.
 *
 * @param keyedProcessFunction The {@link KeyedProcessFunction} that is called for each element in the stream.
 *
 * @param <R> The type of elements emitted by the {@code KeyedProcessFunction}.
 *
 * @return The transformed {@link DataStream}.
 */
@PublicEvolving
public <R> SingleOutputStreamOperator<R> process(KeyedProcessFunction<KEY, T, R> keyedProcessFunction) {
  TypeInformation<R> outType = TypeExtractor.getUnaryOperatorReturnType(
      keyedProcessFunction,
      KeyedProcessFunction.class,
      1,
      2,
      TypeExtractor.NO_INDEX,
      getType(),
      Utils.getCallLocationName(),
      true);
  return process(keyedProcessFunction, outType);
}

代码示例来源:origin: apache/flink

.process(new Tokenizer());

代码示例来源:origin: apache/flink

.process(new ProcessFunction<Integer, Integer>() {
  private static final long serialVersionUID = 1L;

代码示例来源:origin: apache/flink

.process(processFunction);

代码示例来源:origin: apache/flink

return value.f0;
}).process(new ProcessFunction<Tuple2<Integer, Long>, Object>() {
  private static final long serialVersionUID = -805125545438296619L;

代码示例来源:origin: apache/flink

/**
 * Verify that a {@link KeyedStream#process(KeyedProcessFunction)} call is correctly translated to an operator.
 */
@Test
public void testKeyedStreamKeyedProcessTranslation() {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  DataStreamSource<Long> src = env.generateSequence(0, 0);
  KeyedProcessFunction<Long, Long, Integer> keyedProcessFunction = new KeyedProcessFunction<Long, Long, Integer>() {
    private static final long serialVersionUID = 1L;
    @Override
    public void processElement(Long value, Context ctx, Collector<Integer> out) throws Exception {
      // Do nothing
    }
    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<Integer> out) throws Exception {
      // Do nothing
    }
  };
  DataStream<Integer> processed = src
      .keyBy(new IdentityKeySelector<Long>())
      .process(keyedProcessFunction);
  processed.addSink(new DiscardingSink<Integer>());
  assertEquals(keyedProcessFunction, getFunctionForDataStream(processed));
  assertTrue(getOperatorForDataStream(processed) instanceof KeyedProcessOperator);
}

代码示例来源:origin: apache/flink

return value.f0;
}).process(new ProcessFunction<Tuple2<Integer, Long>, Object>() {
  private static final long serialVersionUID = -805125545438296619L;

代码示例来源:origin: org.apache.flink/flink-streaming-java_2.11

true);
return process(processFunction, outType);

代码示例来源:origin: org.apache.flink/flink-streaming-java

true);
return process(processFunction, outType);

代码示例来源:origin: org.apache.flink/flink-streaming-java_2.11

/**
 * Applies the given {@link KeyedProcessFunction} on the input stream, thereby creating a transformed output stream.
 *
 * <p>The function will be called for every element in the input streams and can produce zero
 * or more output elements. Contrary to the {@link DataStream#flatMap(FlatMapFunction)}
 * function, this function can also query the time and set timers. When reacting to the firing
 * of set timers the function can directly emit elements and/or register yet more timers.
 *
 * @param keyedProcessFunction The {@link KeyedProcessFunction} that is called for each element in the stream.
 *
 * @param <R> The type of elements emitted by the {@code KeyedProcessFunction}.
 *
 * @return The transformed {@link DataStream}.
 */
@PublicEvolving
public <R> SingleOutputStreamOperator<R> process(KeyedProcessFunction<KEY, T, R> keyedProcessFunction) {
  TypeInformation<R> outType = TypeExtractor.getUnaryOperatorReturnType(
      keyedProcessFunction,
      KeyedProcessFunction.class,
      1,
      2,
      TypeExtractor.NO_INDEX,
      getType(),
      Utils.getCallLocationName(),
      true);
  return process(keyedProcessFunction, outType);
}

代码示例来源:origin: org.apache.flink/flink-streaming-java

/**
 * Applies the given {@link KeyedProcessFunction} on the input stream, thereby creating a transformed output stream.
 *
 * <p>The function will be called for every element in the input streams and can produce zero
 * or more output elements. Contrary to the {@link DataStream#flatMap(FlatMapFunction)}
 * function, this function can also query the time and set timers. When reacting to the firing
 * of set timers the function can directly emit elements and/or register yet more timers.
 *
 * @param keyedProcessFunction The {@link KeyedProcessFunction} that is called for each element in the stream.
 *
 * @param <R> The type of elements emitted by the {@code KeyedProcessFunction}.
 *
 * @return The transformed {@link DataStream}.
 */
@PublicEvolving
public <R> SingleOutputStreamOperator<R> process(KeyedProcessFunction<KEY, T, R> keyedProcessFunction) {
  TypeInformation<R> outType = TypeExtractor.getUnaryOperatorReturnType(
      keyedProcessFunction,
      KeyedProcessFunction.class,
      1,
      2,
      TypeExtractor.NO_INDEX,
      getType(),
      Utils.getCallLocationName(),
      true);
  return process(keyedProcessFunction, outType);
}

代码示例来源:origin: org.apache.flink/flink-streaming-java_2.10

true);
return process(processFunction, outType);

代码示例来源:origin: com.data-artisans.streamingledger/da-streamingledger-runtime-serial

@Override
  public ResultStreams translate(String name, List<InputAndSpec<?, ?>> streamLedgerSpecs) {
    List<OutputTag<?>> sideOutputTags = createSideOutputTags(streamLedgerSpecs);

    // the input stream is a union of different streams.
    KeyedStream<TaggedElement, Boolean> input = union(streamLedgerSpecs)
        .keyBy(unused -> true);

    // main pipeline
    String serialTransactorName = "SerialTransactor(" + name + ")";
    SingleOutputStreamOperator<Void> resultStream = input
        .process(new SerialTransactor(specs(streamLedgerSpecs), sideOutputTags))
        .name(serialTransactorName)
        .uid(serialTransactorName + "___SERIAL_TX")
        .forceNonParallel()
        .returns(Void.class);

    // gather the sideOutputs.
    Map<String, DataStream<?>> output = new HashMap<>();
    for (OutputTag<?> outputTag : sideOutputTags) {
      DataStream<?> rs = resultStream.getSideOutput(outputTag);
      output.put(outputTag.getId(), rs);
    }
    return new ResultStreams(output);
  }
}

代码示例来源:origin: dataArtisans/da-streamingledger

@Override
  public ResultStreams translate(String name, List<InputAndSpec<?, ?>> streamLedgerSpecs) {
    List<OutputTag<?>> sideOutputTags = createSideOutputTags(streamLedgerSpecs);

    // the input stream is a union of different streams.
    KeyedStream<TaggedElement, Boolean> input = union(streamLedgerSpecs)
        .keyBy(unused -> true);

    // main pipeline
    String serialTransactorName = "SerialTransactor(" + name + ")";
    SingleOutputStreamOperator<Void> resultStream = input
        .process(new SerialTransactor(specs(streamLedgerSpecs), sideOutputTags))
        .name(serialTransactorName)
        .uid(serialTransactorName + "___SERIAL_TX")
        .forceNonParallel()
        .returns(Void.class);

    // gather the sideOutputs.
    Map<String, DataStream<?>> output = new HashMap<>();
    for (OutputTag<?> outputTag : sideOutputTags) {
      DataStream<?> rs = resultStream.getSideOutput(outputTag);
      output.put(outputTag.getId(), rs);
    }
    return new ResultStreams(output);
  }
}

代码示例来源:origin: dataArtisans/flink-training-exercises

public static void main(String[] args) throws Exception {
  ParameterTool params = ParameterTool.fromArgs(args);
  final String input = params.get("input", ExerciseBase.pathToRideData);
  final int maxEventDelay = 60;       // events are out of order by max 60 seconds
  final int servingSpeedFactor = 600; // events of 10 minutes are served in 1 second
  // set up streaming execution environment
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
  env.setParallelism(ExerciseBase.parallelism);
  // start the data generator
  DataStream<TaxiRide> rides = env.addSource(rideSourceOrTest(new TaxiRideSource(input, maxEventDelay, servingSpeedFactor)));
  DataStream<TaxiRide> longRides = rides
      .keyBy(ride -> ride.rideId)
      .process(new MatchFunction());
  printOrTest(longRides);
  env.execute("Long Taxi Rides");
}

代码示例来源:origin: dataArtisans/flink-training-exercises

public static void main(String[] args) throws Exception {
  ParameterTool params = ParameterTool.fromArgs(args);
  final String input = params.get("input", ExerciseBase.pathToRideData);
  final int maxEventDelay = 60;       // events are out of order by max 60 seconds
  final int servingSpeedFactor = 600; // events of 10 minutes are served in 1 second
  // set up streaming execution environment
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
  env.setParallelism(ExerciseBase.parallelism);
  // start the data generator
  DataStream<TaxiRide> rides = env.addSource(rideSourceOrTest(new TaxiRideSource(input, maxEventDelay, servingSpeedFactor)));
  DataStream<TaxiRide> longRides = rides
      .keyBy(r -> r.rideId)
      .process(new MatchFunction());
  printOrTest(longRides);
  env.execute("Long Taxi Rides");
}

代码示例来源:origin: dataArtisans/flink-training-exercises

public static void main(String[] args) throws Exception {
  // read parameters
  ParameterTool params = ParameterTool.fromArgs(args);
  String input = params.getRequired("input");
  // set up streaming execution environment
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
  env.setParallelism(1);
  // connect to the data file
  DataStream<String> carData = env.readTextFile(input);
  // map to events
  DataStream<ConnectedCarEvent> events = carData
      .map((String line) -> ConnectedCarEvent.fromString(line))
      .assignTimestampsAndWatermarks(new ConnectedCarAssigner());
  // sort events
  events.keyBy((ConnectedCarEvent event) -> event.carId)
      .process(new SortFunction())
      .print();
  env.execute("Sort Connected Car Events");
}

代码示例来源:origin: dataArtisans/flink-training-exercises

public static void main(String[] args) throws Exception {
  ParameterTool params = ParameterTool.fromArgs(args);
  final String input = params.get("input", ExerciseBase.pathToRideData);
  final int servingSpeedFactor = 1800; // 30 minutes worth of events are served every second
  // set up streaming execution environment
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
  env.setParallelism(ExerciseBase.parallelism);
  // set up checkpointing
  env.setStateBackend(new FsStateBackend("file:///tmp/checkpoints"));
  env.enableCheckpointing(1000);
  env.setRestartStrategy(RestartStrategies.fixedDelayRestart(60, Time.of(10, TimeUnit.SECONDS)));
  DataStream<TaxiRide> rides = env.addSource(rideSourceOrTest(new CheckpointedTaxiRideSource(input, servingSpeedFactor)));
  DataStream<TaxiRide> longRides = rides
      .filter(new NYCFilter())
      .keyBy((TaxiRide ride) -> ride.rideId)
      .process(new MatchFunction());
  printOrTest(longRides);
  env.execute("Long Taxi Rides (checkpointed)");
}

代码示例来源:origin: dataArtisans/flink-training-exercises

.process(new ClosestTaxi());

相关文章