本文整理了Java中org.apache.flink.streaming.api.datastream.KeyedStream.transform()
方法的一些代码示例,展示了KeyedStream.transform()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。KeyedStream.transform()
方法的具体详情如下:
包路径:org.apache.flink.streaming.api.datastream.KeyedStream
类名称:KeyedStream
方法名:transform
暂无
代码示例来源:origin: apache/flink
/**
* Applies the given {@link ProcessFunction} on the input stream, thereby creating a transformed output stream.
*
* <p>The function will be called for every element in the input streams and can produce zero
* or more output elements. Contrary to the {@link DataStream#flatMap(FlatMapFunction)}
* function, this function can also query the time and set timers. When reacting to the firing
* of set timers the function can directly emit elements and/or register yet more timers.
*
* @param processFunction The {@link ProcessFunction} that is called for each element
* in the stream.
* @param outputType {@link TypeInformation} for the result type of the function.
*
* @param <R> The type of elements emitted by the {@code ProcessFunction}.
*
* @return The transformed {@link DataStream}.
*
* @deprecated Use {@link KeyedStream#process(KeyedProcessFunction, TypeInformation)}
*/
@Deprecated
@Override
@Internal
public <R> SingleOutputStreamOperator<R> process(
ProcessFunction<T, R> processFunction,
TypeInformation<R> outputType) {
LegacyKeyedProcessOperator<KEY, T, R> operator = new LegacyKeyedProcessOperator<>(clean(processFunction));
return transform("Process", outputType, operator);
}
代码示例来源:origin: apache/flink
/**
* Applies the given {@link KeyedProcessFunction} on the input stream, thereby creating a transformed output stream.
*
* <p>The function will be called for every element in the input streams and can produce zero
* or more output elements. Contrary to the {@link DataStream#flatMap(FlatMapFunction)}
* function, this function can also query the time and set timers. When reacting to the firing
* of set timers the function can directly emit elements and/or register yet more timers.
*
* @param keyedProcessFunction The {@link KeyedProcessFunction} that is called for each element in the stream.
*
* @param outputType {@link TypeInformation} for the result type of the function.
*
* @param <R> The type of elements emitted by the {@code KeyedProcessFunction}.
*
* @return The transformed {@link DataStream}.
*/
@Internal
public <R> SingleOutputStreamOperator<R> process(
KeyedProcessFunction<KEY, T, R> keyedProcessFunction,
TypeInformation<R> outputType) {
KeyedProcessOperator<KEY, T, R> operator = new KeyedProcessOperator<>(clean(keyedProcessFunction));
return transform("KeyedProcess", outputType, operator);
}
代码示例来源:origin: apache/flink
/**
* Applies a fold transformation on the grouped data stream grouped on by
* the given key position. The {@link FoldFunction} will receive input
* values based on the key value. Only input values with the same key will
* go to the same folder.
*
* @param folder
* The {@link FoldFunction} that will be called for every element
* of the input values with the same key.
* @param initialValue
* The initialValue passed to the folders for each key.
* @return The transformed DataStream.
*
* @deprecated will be removed in a future version
*/
@Deprecated
public <R> SingleOutputStreamOperator<R> fold(R initialValue, FoldFunction<T, R> folder) {
TypeInformation<R> outType = TypeExtractor.getFoldReturnTypes(
clean(folder), getType(), Utils.getCallLocationName(), true);
return transform("Keyed Fold", outType, new StreamGroupedFold<>(clean(folder), initialValue));
}
代码示例来源:origin: apache/flink
protected SingleOutputStreamOperator<T> aggregate(AggregationFunction<T> aggregate) {
StreamGroupedReduce<T> operator = new StreamGroupedReduce<T>(
clean(aggregate), getType().createSerializer(getExecutionConfig()));
return transform("Keyed Aggregation", getType(), operator);
}
代码示例来源:origin: apache/flink
/**
* Applies a reduce transformation on the grouped data stream grouped on by
* the given key position. The {@link ReduceFunction} will receive input
* values based on the key value. Only input values with the same key will
* go to the same reducer.
*
* @param reducer
* The {@link ReduceFunction} that will be called for every
* element of the input values with the same key.
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<T> reduce(ReduceFunction<T> reducer) {
return transform("Keyed Reduce", getType(), new StreamGroupedReduce<T>(
clean(reducer), getType().createSerializer(getExecutionConfig())));
}
代码示例来源:origin: apache/flink
/**
* Publishes the keyed stream as a queryable FoldingState instance.
*
* @param queryableStateName Name under which to the publish the queryable state instance
* @param stateDescriptor State descriptor to create state instance from
* @return Queryable state instance
*
* @deprecated will be removed in a future version
*/
@PublicEvolving
@Deprecated
public <ACC> QueryableStateStream<KEY, ACC> asQueryableState(
String queryableStateName,
FoldingStateDescriptor<T, ACC> stateDescriptor) {
transform("Queryable state: " + queryableStateName,
getType(),
new QueryableAppendingStateOperator<>(queryableStateName, stateDescriptor));
stateDescriptor.initializeSerializerUnlessSet(getExecutionConfig());
return new QueryableStateStream<>(
queryableStateName,
stateDescriptor,
getKeyType().createSerializer(getExecutionConfig()));
}
代码示例来源:origin: apache/flink
KeyedStream<IN, K> keyedStream = (KeyedStream<IN, K>) inputStream;
patternStream = keyedStream.transform(
"CepOperator",
outTypeInfo,
KeySelector<IN, Byte> keySelector = new NullByteKeySelector<>();
patternStream = inputStream.keyBy(keySelector).transform(
"GlobalCepOperator",
outTypeInfo,
代码示例来源:origin: apache/flink
/**
* Publishes the keyed stream as a queryable ReducingState instance.
*
* @param queryableStateName Name under which to the publish the queryable state instance
* @param stateDescriptor State descriptor to create state instance from
* @return Queryable state instance
*/
@PublicEvolving
public QueryableStateStream<KEY, T> asQueryableState(
String queryableStateName,
ReducingStateDescriptor<T> stateDescriptor) {
transform("Queryable state: " + queryableStateName,
getType(),
new QueryableAppendingStateOperator<>(queryableStateName, stateDescriptor));
stateDescriptor.initializeSerializerUnlessSet(getExecutionConfig());
return new QueryableStateStream<>(
queryableStateName,
stateDescriptor,
getKeyType().createSerializer(getExecutionConfig()));
}
代码示例来源:origin: apache/flink
/**
* Publishes the keyed stream as a queryable ValueState instance.
*
* @param queryableStateName Name under which to the publish the queryable state instance
* @param stateDescriptor State descriptor to create state instance from
* @return Queryable state instance
*/
@PublicEvolving
public QueryableStateStream<KEY, T> asQueryableState(
String queryableStateName,
ValueStateDescriptor<T> stateDescriptor) {
transform("Queryable state: " + queryableStateName,
getType(),
new QueryableValueStateOperator<>(queryableStateName, stateDescriptor));
stateDescriptor.initializeSerializerUnlessSet(getExecutionConfig());
return new QueryableStateStream<>(
queryableStateName,
stateDescriptor,
getKeyType().createSerializer(getExecutionConfig()));
}
代码示例来源:origin: apache/flink
return input.transform(opName, resultType, operator);
代码示例来源:origin: apache/flink
return value.f0;
}).transform(
"TestAggregatingOperator",
BasicTypeInfo.STRING_TYPE_INFO,
代码示例来源:origin: apache/flink
return input.transform(opName, resultType, operator);
代码示例来源:origin: apache/flink
return input.transform(opName, resultType, operator).forceNonParallel();
代码示例来源:origin: apache/flink
return input.transform(opName, resultType, operator);
代码示例来源:origin: apache/flink
return input.transform(opName, resultType, operator);
代码示例来源:origin: apache/flink
return input.transform(opName, resultType, operator).forceNonParallel();
代码示例来源:origin: apache/flink
return input.transform(opName, resultType, operator);
代码示例来源:origin: apache/flink
return input.transform(opName, resultType, operator).forceNonParallel();
代码示例来源:origin: apache/flink
.flatMap(new KeyedStateSettingFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap")
.keyBy(0)
.transform(
"custom_operator",
new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(),
new CheckpointedUdfOperator(new LegacyCheckpointedFlatMapWithKeyedState())).uid("LegacyCheckpointedOperator")
.keyBy(0)
.transform(
"timely_stateful_operator",
new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(),
代码示例来源:origin: apache/flink
.flatMap(new CheckingKeyedStateFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap")
.keyBy(0)
.transform(
"custom_operator",
new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(),
new CheckingRestoringUdfOperator(new CheckingRestoringFlatMapWithKeyedStateInOperator())).uid("LegacyCheckpointedOperator")
.keyBy(0)
.transform(
"timely_stateful_operator",
new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(),
内容来源于网络,如有侵权,请联系作者删除!