本文整理了Java中org.apache.flink.streaming.api.datastream.KeyedStream
类的一些代码示例,展示了KeyedStream
类的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。KeyedStream
类的具体详情如下:
包路径:org.apache.flink.streaming.api.datastream.KeyedStream
类名称:KeyedStream
[英]A KeyedStream represents a DataStream on which operator state is partitioned by key using a provided KeySelector. Typical operations supported by a DataStream are also possible on a KeyedStream, with the exception of partitioning methods such as shuffle, forward and keyBy.
Reduce-style operations, such as #reduce, #sum and #fold work on elements that have the same key.
[中]KeyedStream表示一个数据流,在该数据流上,使用提供的KeySelector按键对运算符状态进行分区。数据流支持的典型操作也可以在KeyedStream上进行,但诸如shuffle、forward和keyBy之类的分区方法除外。
Reduce样式操作,例如对具有相同键的元素执行#Reduce、#sum和#fold操作。
代码示例来源:origin: apache/flink
/**
* A thin wrapper layer over {@link KeyedStream#timeWindow(Time)}.
*
* @param size The size of the window.
* @return The python windowed stream {@link PythonWindowedStream}
*/
public PythonWindowedStream time_window(Time size) {
return new PythonWindowedStream<TimeWindow>(this.stream.timeWindow(size));
}
代码示例来源:origin: apache/flink
private static KeyedStream<Event, Integer> applyTestStatefulOperator(
String name,
JoinFunction<Event, ComplexPayload, ComplexPayload> stateFunc,
KeyedStream<Event, Integer> source,
List<TypeSerializer<ComplexPayload>> stateSer,
List<Class<ComplexPayload>> stateClass) {
return source
.map(createArtificialKeyedStateMapper(e -> e, stateFunc, stateSer, stateClass))
.name(name)
.uid(name)
.returns(Event.class)
.keyBy(Event::getKey);
}
代码示例来源:origin: apache/flink
/**
* Publishes the keyed stream as a queryable ReducingState instance.
*
* @param queryableStateName Name under which to the publish the queryable state instance
* @param stateDescriptor State descriptor to create state instance from
* @return Queryable state instance
*/
@PublicEvolving
public QueryableStateStream<KEY, T> asQueryableState(
String queryableStateName,
ReducingStateDescriptor<T> stateDescriptor) {
transform("Queryable state: " + queryableStateName,
getType(),
new QueryableAppendingStateOperator<>(queryableStateName, stateDescriptor));
stateDescriptor.initializeSerializerUnlessSet(getExecutionConfig());
return new QueryableStateStream<>(
queryableStateName,
stateDescriptor,
getKeyType().createSerializer(getExecutionConfig()));
}
代码示例来源:origin: apache/flink
protected SingleOutputStreamOperator<T> aggregate(AggregationFunction<T> aggregate) {
StreamGroupedReduce<T> operator = new StreamGroupedReduce<T>(
clean(aggregate), getType().createSerializer(getExecutionConfig()));
return transform("Keyed Aggregation", getType(), operator);
}
代码示例来源:origin: apache/flink
/**
* Applies a fold transformation on the grouped data stream grouped on by
* the given key position. The {@link FoldFunction} will receive input
* values based on the key value. Only input values with the same key will
* go to the same folder.
*
* @param folder
* The {@link FoldFunction} that will be called for every element
* of the input values with the same key.
* @param initialValue
* The initialValue passed to the folders for each key.
* @return The transformed DataStream.
*
* @deprecated will be removed in a future version
*/
@Deprecated
public <R> SingleOutputStreamOperator<R> fold(R initialValue, FoldFunction<T, R> folder) {
TypeInformation<R> outType = TypeExtractor.getFoldReturnTypes(
clean(folder), getType(), Utils.getCallLocationName(), true);
return transform("Keyed Fold", outType, new StreamGroupedFold<>(clean(folder), initialValue));
}
代码示例来源:origin: apache/flink
private <R> SingleOutputStreamOperator<R> apply(InternalWindowFunction<Iterable<T>, R, K, W> function, TypeInformation<R> resultType, Function originalFunction) {
KeySelector<T, K> keySel = input.getKeySelector();
@SuppressWarnings({"unchecked", "rawtypes"})
TypeSerializer<StreamRecord<T>> streamRecordSerializer =
(TypeSerializer<StreamRecord<T>>) new StreamElementSerializer(input.getType().createSerializer(getExecutionEnvironment().getConfig()));
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
keySel,
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
stateDesc,
function,
input.getType().createSerializer(getExecutionEnvironment().getConfig()));
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
keySel,
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
stateDesc,
function,
return input.transform(opName, resultType, operator);
代码示例来源:origin: apache/flink
.flatMap(new LegacyCheckpointedFlatMap()).startNewChain().uid("LegacyCheckpointedFlatMap")
.keyBy(0)
.flatMap(new LegacyCheckpointedFlatMapWithKeyedState()).startNewChain().uid("LegacyCheckpointedFlatMapWithKeyedState")
.keyBy(0)
.flatMap(new KeyedStateSettingFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap")
.keyBy(0)
.transform(
"custom_operator",
new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(),
new CheckpointedUdfOperator(new LegacyCheckpointedFlatMapWithKeyedState())).uid("LegacyCheckpointedOperator")
.keyBy(0)
.transform(
"timely_stateful_operator",
new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(),
代码示例来源:origin: apache/flink
/**
* Windows this {@code KeyedStream} into tumbling time windows.
*
* <p>This is a shortcut for either {@code .window(TumblingEventTimeWindows.of(size))} or
* {@code .window(TumblingProcessingTimeWindows.of(size))} depending on the time characteristic
* set using
* {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic)}
*
* @param size The size of the window.
*/
public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size) {
if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
return window(TumblingProcessingTimeWindows.of(size));
} else {
return window(TumblingEventTimeWindows.of(size));
}
}
代码示例来源:origin: apache/flink
.process(new ProcessFunction<Integer, Integer>() {
private static final long serialVersionUID = 1L;
代码示例来源:origin: apache/flink
@Test
public void testNestedPojoFieldAccessor() throws Exception {
StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
see.getConfig().disableObjectReuse();
see.setParallelism(4);
DataStream<Data> dataStream = see.fromCollection(elements);
DataStream<Data> summedStream = dataStream
.keyBy("aaa")
.sum("stats.count")
.keyBy("aaa")
.flatMap(new FlatMapFunction<Data, Data>() {
Data[] first = new Data[3];
@Override
public void flatMap(Data value, Collector<Data> out) throws Exception {
if (first[value.aaa] == null) {
first[value.aaa] = value;
if (value.stats.count != 123) {
throw new RuntimeException("Expected stats.count to be 123");
}
} else {
if (value.stats.count != 2 * 123) {
throw new RuntimeException("Expected stats.count to be 2 * 123");
}
}
}
});
summedStream.print();
see.execute();
}
代码示例来源:origin: apache/flink
public static void main(String[] args) throws Exception {
final ParameterTool pt = ParameterTool.fromArgs(args);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
setupEnvironment(env, pt);
KeyedStream<Event, Integer> source = env.addSource(createEventSource(pt))
.name("EventSource")
.uid("EventSource")
.assignTimestampsAndWatermarks(createTimestampExtractor(pt))
.keyBy(Event::getKey);
List<TypeSerializer<ComplexPayload>> stateSer =
Collections.singletonList(new KryoSerializer<>(ComplexPayload.class, env.getConfig()));
KeyedStream<Event, Integer> afterStatefulOperations = isOriginalJobVariant(pt) ?
applyOriginalStatefulOperations(source, stateSer, Collections.emptyList()) :
applyUpgradedStatefulOperations(source, stateSer, Collections.emptyList());
afterStatefulOperations
.flatMap(createSemanticsCheckMapper(pt))
.name("SemanticsCheckMapper")
.addSink(new PrintSinkFunction<>());
env.execute("General purpose test job");
}
代码示例来源:origin: apache/flink
Preconditions.checkNotNull(outputType);
final ProcessJoinFunction<IN1, IN2, OUT> cleanedUdf = left.getExecutionEnvironment().clean(processJoinFunction);
lowerBoundInclusive,
upperBoundInclusive,
left.getType().createSerializer(left.getExecutionConfig()),
right.getType().createSerializer(right.getExecutionConfig()),
cleanedUdf
);
.connect(right)
.keyBy(keySelector1, keySelector2)
.transform("Interval Join", outputType, operator);
代码示例来源:origin: apache/flink
.process(new Tokenizer());
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
代码示例来源:origin: apache/flink
/**
* Applies the given {@link KeyedProcessFunction} on the input stream, thereby creating a transformed output stream.
*
* <p>The function will be called for every element in the input streams and can produce zero
* or more output elements. Contrary to the {@link DataStream#flatMap(FlatMapFunction)}
* function, this function can also query the time and set timers. When reacting to the firing
* of set timers the function can directly emit elements and/or register yet more timers.
*
* @param keyedProcessFunction The {@link KeyedProcessFunction} that is called for each element in the stream.
*
* @param outputType {@link TypeInformation} for the result type of the function.
*
* @param <R> The type of elements emitted by the {@code KeyedProcessFunction}.
*
* @return The transformed {@link DataStream}.
*/
@Internal
public <R> SingleOutputStreamOperator<R> process(
KeyedProcessFunction<KEY, T, R> keyedProcessFunction,
TypeInformation<R> outputType) {
KeyedProcessOperator<KEY, T, R> operator = new KeyedProcessOperator<>(clean(keyedProcessFunction));
return transform("KeyedProcess", outputType, operator);
}
代码示例来源:origin: apache/flink
KeyedStream<IN2, ?> keyedInput2 = (KeyedStream<IN2, ?>) inputStream2;
TypeInformation<?> keyType1 = keyedInput1.getKeyType();
TypeInformation<?> keyType2 = keyedInput2.getKeyType();
if (!(keyType1.canEqual(keyType2) && keyType1.equals(keyType2))) {
throw new UnsupportedOperationException("Key types if input KeyedStreams " +
transform.setStateKeySelectors(keyedInput1.getKeySelector(), keyedInput2.getKeySelector());
transform.setStateKeyType(keyType1);
代码示例来源:origin: apache/flink
public StreamExecutionEnvironment getExecutionEnvironment() {
return input.getExecutionEnvironment();
}
代码示例来源:origin: apache/flink
1,
TypeExtractor.NO_INDEX,
getType(),
Utils.getCallLocationName(),
true);
return process(processFunction, outType);
代码示例来源:origin: apache/flink
.connect(npBroadcastStream)
.process(firstBroadcastFunction).uid("BrProcess1")
.addSink(new MigrationTestUtils.AccumulatorCountingSink<>());
.connect(pBroadcastStream)
.process(secondBroadcastFunction).uid("BrProcess2")
.addSink(new MigrationTestUtils.AccumulatorCountingSink<>());
代码示例来源:origin: apache/flink
.map(new OnceFailingPartitionedSum(failurePos))
.keyBy(0)
.addSink(new CounterSink());
代码示例来源:origin: apache/flink
KeyedStream<IN, K> keyedStream = (KeyedStream<IN, K>) inputStream;
patternStream = keyedStream.transform(
"CepOperator",
outTypeInfo,
KeySelector<IN, Byte> keySelector = new NullByteKeySelector<>();
patternStream = inputStream.keyBy(keySelector).transform(
"GlobalCepOperator",
outTypeInfo,
内容来源于网络,如有侵权,请联系作者删除!