本文整理了Java中org.apache.flink.streaming.api.datastream.KeyedStream.process()
方法的一些代码示例,展示了KeyedStream.process()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。KeyedStream.process()
方法的具体详情如下:
包路径:org.apache.flink.streaming.api.datastream.KeyedStream
类名称:KeyedStream
方法名:process
[英]Applies the given KeyedProcessFunction on the input stream, thereby creating a transformed output stream.
The function will be called for every element in the input streams and can produce zero or more output elements. Contrary to the DataStream#flatMap(FlatMapFunction)function, this function can also query the time and set timers. When reacting to the firing of set timers the function can directly emit elements and/or register yet more timers.
[中]将给定的KeyedProcessFunction应用于输入流,从而创建转换后的输出流。
该函数将为输入流中的每个元素调用,并且可以生成零个或多个输出元素。与DataStream#flatMap(FlatMapFunction)函数相反,此函数还可以查询时间和设置计时器。当对设置计时器的触发作出反应时,该函数可以直接发出元素和/或注册更多计时器。
代码示例来源:origin: apache/flink
true);
return process(processFunction, outType);
代码示例来源:origin: apache/flink
/**
* Applies the given {@link KeyedProcessFunction} on the input stream, thereby creating a transformed output stream.
*
* <p>The function will be called for every element in the input streams and can produce zero
* or more output elements. Contrary to the {@link DataStream#flatMap(FlatMapFunction)}
* function, this function can also query the time and set timers. When reacting to the firing
* of set timers the function can directly emit elements and/or register yet more timers.
*
* @param keyedProcessFunction The {@link KeyedProcessFunction} that is called for each element in the stream.
*
* @param <R> The type of elements emitted by the {@code KeyedProcessFunction}.
*
* @return The transformed {@link DataStream}.
*/
@PublicEvolving
public <R> SingleOutputStreamOperator<R> process(KeyedProcessFunction<KEY, T, R> keyedProcessFunction) {
TypeInformation<R> outType = TypeExtractor.getUnaryOperatorReturnType(
keyedProcessFunction,
KeyedProcessFunction.class,
1,
2,
TypeExtractor.NO_INDEX,
getType(),
Utils.getCallLocationName(),
true);
return process(keyedProcessFunction, outType);
}
代码示例来源:origin: apache/flink
.process(new Tokenizer());
代码示例来源:origin: apache/flink
.process(new ProcessFunction<Integer, Integer>() {
private static final long serialVersionUID = 1L;
代码示例来源:origin: apache/flink
.process(processFunction);
代码示例来源:origin: apache/flink
return value.f0;
}).process(new ProcessFunction<Tuple2<Integer, Long>, Object>() {
private static final long serialVersionUID = -805125545438296619L;
代码示例来源:origin: apache/flink
/**
* Verify that a {@link KeyedStream#process(KeyedProcessFunction)} call is correctly translated to an operator.
*/
@Test
public void testKeyedStreamKeyedProcessTranslation() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Long> src = env.generateSequence(0, 0);
KeyedProcessFunction<Long, Long, Integer> keyedProcessFunction = new KeyedProcessFunction<Long, Long, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public void processElement(Long value, Context ctx, Collector<Integer> out) throws Exception {
// Do nothing
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Integer> out) throws Exception {
// Do nothing
}
};
DataStream<Integer> processed = src
.keyBy(new IdentityKeySelector<Long>())
.process(keyedProcessFunction);
processed.addSink(new DiscardingSink<Integer>());
assertEquals(keyedProcessFunction, getFunctionForDataStream(processed));
assertTrue(getOperatorForDataStream(processed) instanceof KeyedProcessOperator);
}
代码示例来源:origin: apache/flink
return value.f0;
}).process(new ProcessFunction<Tuple2<Integer, Long>, Object>() {
private static final long serialVersionUID = -805125545438296619L;
代码示例来源:origin: org.apache.flink/flink-streaming-java_2.11
true);
return process(processFunction, outType);
代码示例来源:origin: org.apache.flink/flink-streaming-java
true);
return process(processFunction, outType);
代码示例来源:origin: org.apache.flink/flink-streaming-java_2.11
/**
* Applies the given {@link KeyedProcessFunction} on the input stream, thereby creating a transformed output stream.
*
* <p>The function will be called for every element in the input streams and can produce zero
* or more output elements. Contrary to the {@link DataStream#flatMap(FlatMapFunction)}
* function, this function can also query the time and set timers. When reacting to the firing
* of set timers the function can directly emit elements and/or register yet more timers.
*
* @param keyedProcessFunction The {@link KeyedProcessFunction} that is called for each element in the stream.
*
* @param <R> The type of elements emitted by the {@code KeyedProcessFunction}.
*
* @return The transformed {@link DataStream}.
*/
@PublicEvolving
public <R> SingleOutputStreamOperator<R> process(KeyedProcessFunction<KEY, T, R> keyedProcessFunction) {
TypeInformation<R> outType = TypeExtractor.getUnaryOperatorReturnType(
keyedProcessFunction,
KeyedProcessFunction.class,
1,
2,
TypeExtractor.NO_INDEX,
getType(),
Utils.getCallLocationName(),
true);
return process(keyedProcessFunction, outType);
}
代码示例来源:origin: org.apache.flink/flink-streaming-java
/**
* Applies the given {@link KeyedProcessFunction} on the input stream, thereby creating a transformed output stream.
*
* <p>The function will be called for every element in the input streams and can produce zero
* or more output elements. Contrary to the {@link DataStream#flatMap(FlatMapFunction)}
* function, this function can also query the time and set timers. When reacting to the firing
* of set timers the function can directly emit elements and/or register yet more timers.
*
* @param keyedProcessFunction The {@link KeyedProcessFunction} that is called for each element in the stream.
*
* @param <R> The type of elements emitted by the {@code KeyedProcessFunction}.
*
* @return The transformed {@link DataStream}.
*/
@PublicEvolving
public <R> SingleOutputStreamOperator<R> process(KeyedProcessFunction<KEY, T, R> keyedProcessFunction) {
TypeInformation<R> outType = TypeExtractor.getUnaryOperatorReturnType(
keyedProcessFunction,
KeyedProcessFunction.class,
1,
2,
TypeExtractor.NO_INDEX,
getType(),
Utils.getCallLocationName(),
true);
return process(keyedProcessFunction, outType);
}
代码示例来源:origin: org.apache.flink/flink-streaming-java_2.10
true);
return process(processFunction, outType);
代码示例来源:origin: com.data-artisans.streamingledger/da-streamingledger-runtime-serial
@Override
public ResultStreams translate(String name, List<InputAndSpec<?, ?>> streamLedgerSpecs) {
List<OutputTag<?>> sideOutputTags = createSideOutputTags(streamLedgerSpecs);
// the input stream is a union of different streams.
KeyedStream<TaggedElement, Boolean> input = union(streamLedgerSpecs)
.keyBy(unused -> true);
// main pipeline
String serialTransactorName = "SerialTransactor(" + name + ")";
SingleOutputStreamOperator<Void> resultStream = input
.process(new SerialTransactor(specs(streamLedgerSpecs), sideOutputTags))
.name(serialTransactorName)
.uid(serialTransactorName + "___SERIAL_TX")
.forceNonParallel()
.returns(Void.class);
// gather the sideOutputs.
Map<String, DataStream<?>> output = new HashMap<>();
for (OutputTag<?> outputTag : sideOutputTags) {
DataStream<?> rs = resultStream.getSideOutput(outputTag);
output.put(outputTag.getId(), rs);
}
return new ResultStreams(output);
}
}
代码示例来源:origin: dataArtisans/da-streamingledger
@Override
public ResultStreams translate(String name, List<InputAndSpec<?, ?>> streamLedgerSpecs) {
List<OutputTag<?>> sideOutputTags = createSideOutputTags(streamLedgerSpecs);
// the input stream is a union of different streams.
KeyedStream<TaggedElement, Boolean> input = union(streamLedgerSpecs)
.keyBy(unused -> true);
// main pipeline
String serialTransactorName = "SerialTransactor(" + name + ")";
SingleOutputStreamOperator<Void> resultStream = input
.process(new SerialTransactor(specs(streamLedgerSpecs), sideOutputTags))
.name(serialTransactorName)
.uid(serialTransactorName + "___SERIAL_TX")
.forceNonParallel()
.returns(Void.class);
// gather the sideOutputs.
Map<String, DataStream<?>> output = new HashMap<>();
for (OutputTag<?> outputTag : sideOutputTags) {
DataStream<?> rs = resultStream.getSideOutput(outputTag);
output.put(outputTag.getId(), rs);
}
return new ResultStreams(output);
}
}
代码示例来源:origin: dataArtisans/flink-training-exercises
public static void main(String[] args) throws Exception {
ParameterTool params = ParameterTool.fromArgs(args);
final String input = params.get("input", ExerciseBase.pathToRideData);
final int maxEventDelay = 60; // events are out of order by max 60 seconds
final int servingSpeedFactor = 600; // events of 10 minutes are served in 1 second
// set up streaming execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(ExerciseBase.parallelism);
// start the data generator
DataStream<TaxiRide> rides = env.addSource(rideSourceOrTest(new TaxiRideSource(input, maxEventDelay, servingSpeedFactor)));
DataStream<TaxiRide> longRides = rides
.keyBy(ride -> ride.rideId)
.process(new MatchFunction());
printOrTest(longRides);
env.execute("Long Taxi Rides");
}
代码示例来源:origin: dataArtisans/flink-training-exercises
public static void main(String[] args) throws Exception {
ParameterTool params = ParameterTool.fromArgs(args);
final String input = params.get("input", ExerciseBase.pathToRideData);
final int maxEventDelay = 60; // events are out of order by max 60 seconds
final int servingSpeedFactor = 600; // events of 10 minutes are served in 1 second
// set up streaming execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(ExerciseBase.parallelism);
// start the data generator
DataStream<TaxiRide> rides = env.addSource(rideSourceOrTest(new TaxiRideSource(input, maxEventDelay, servingSpeedFactor)));
DataStream<TaxiRide> longRides = rides
.keyBy(r -> r.rideId)
.process(new MatchFunction());
printOrTest(longRides);
env.execute("Long Taxi Rides");
}
代码示例来源:origin: dataArtisans/flink-training-exercises
public static void main(String[] args) throws Exception {
// read parameters
ParameterTool params = ParameterTool.fromArgs(args);
String input = params.getRequired("input");
// set up streaming execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
// connect to the data file
DataStream<String> carData = env.readTextFile(input);
// map to events
DataStream<ConnectedCarEvent> events = carData
.map((String line) -> ConnectedCarEvent.fromString(line))
.assignTimestampsAndWatermarks(new ConnectedCarAssigner());
// sort events
events.keyBy((ConnectedCarEvent event) -> event.carId)
.process(new SortFunction())
.print();
env.execute("Sort Connected Car Events");
}
代码示例来源:origin: dataArtisans/flink-training-exercises
public static void main(String[] args) throws Exception {
ParameterTool params = ParameterTool.fromArgs(args);
final String input = params.get("input", ExerciseBase.pathToRideData);
final int servingSpeedFactor = 1800; // 30 minutes worth of events are served every second
// set up streaming execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(ExerciseBase.parallelism);
// set up checkpointing
env.setStateBackend(new FsStateBackend("file:///tmp/checkpoints"));
env.enableCheckpointing(1000);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(60, Time.of(10, TimeUnit.SECONDS)));
DataStream<TaxiRide> rides = env.addSource(rideSourceOrTest(new CheckpointedTaxiRideSource(input, servingSpeedFactor)));
DataStream<TaxiRide> longRides = rides
.filter(new NYCFilter())
.keyBy((TaxiRide ride) -> ride.rideId)
.process(new MatchFunction());
printOrTest(longRides);
env.execute("Long Taxi Rides (checkpointed)");
}
代码示例来源:origin: dataArtisans/flink-training-exercises
.process(new ClosestTaxi());
内容来源于网络,如有侵权,请联系作者删除!