org.apache.flink.streaming.api.datastream.KeyedStream类的使用及代码示例

x33g5p2x  于2022-01-23 转载在 其他  
字(11.0k)|赞(0)|评价(0)|浏览(277)

本文整理了Java中org.apache.flink.streaming.api.datastream.KeyedStream类的一些代码示例,展示了KeyedStream类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。KeyedStream类的具体详情如下:
包路径:org.apache.flink.streaming.api.datastream.KeyedStream
类名称:KeyedStream

KeyedStream介绍

[英]A KeyedStream represents a DataStream on which operator state is partitioned by key using a provided KeySelector. Typical operations supported by a DataStream are also possible on a KeyedStream, with the exception of partitioning methods such as shuffle, forward and keyBy.

Reduce-style operations, such as #reduce, #sum and #fold work on elements that have the same key.
[中]KeyedStream表示一个数据流,在该数据流上,使用提供的KeySelector按键对运算符状态进行分区。数据流支持的典型操作也可以在KeyedStream上进行,但诸如shuffle、forward和keyBy之类的分区方法除外。
Reduce样式操作,例如对具有相同键的元素执行#Reduce、#sum和#fold操作。

代码示例

代码示例来源:origin: apache/flink

/**
 * A thin wrapper layer over {@link KeyedStream#timeWindow(Time)}.
 *
 * @param size The size of the window.
 * @return The python windowed stream {@link PythonWindowedStream}
 */
public PythonWindowedStream time_window(Time size) {
  return new PythonWindowedStream<TimeWindow>(this.stream.timeWindow(size));
}

代码示例来源:origin: apache/flink

private static KeyedStream<Event, Integer> applyTestStatefulOperator(
  String name,
  JoinFunction<Event, ComplexPayload, ComplexPayload> stateFunc,
  KeyedStream<Event, Integer> source,
  List<TypeSerializer<ComplexPayload>> stateSer,
  List<Class<ComplexPayload>> stateClass) {
  return source
    .map(createArtificialKeyedStateMapper(e -> e, stateFunc, stateSer, stateClass))
    .name(name)
    .uid(name)
    .returns(Event.class)
    .keyBy(Event::getKey);
}

代码示例来源:origin: apache/flink

/**
 * Publishes the keyed stream as a queryable ReducingState instance.
 *
 * @param queryableStateName Name under which to the publish the queryable state instance
 * @param stateDescriptor State descriptor to create state instance from
 * @return Queryable state instance
 */
@PublicEvolving
public QueryableStateStream<KEY, T> asQueryableState(
    String queryableStateName,
    ReducingStateDescriptor<T> stateDescriptor) {
  transform("Queryable state: " + queryableStateName,
      getType(),
      new QueryableAppendingStateOperator<>(queryableStateName, stateDescriptor));
  stateDescriptor.initializeSerializerUnlessSet(getExecutionConfig());
  return new QueryableStateStream<>(
      queryableStateName,
      stateDescriptor,
      getKeyType().createSerializer(getExecutionConfig()));
}

代码示例来源:origin: apache/flink

protected SingleOutputStreamOperator<T> aggregate(AggregationFunction<T> aggregate) {
  StreamGroupedReduce<T> operator = new StreamGroupedReduce<T>(
      clean(aggregate), getType().createSerializer(getExecutionConfig()));
  return transform("Keyed Aggregation", getType(), operator);
}

代码示例来源:origin: apache/flink

/**
 * Applies a fold transformation on the grouped data stream grouped on by
 * the given key position. The {@link FoldFunction} will receive input
 * values based on the key value. Only input values with the same key will
 * go to the same folder.
 *
 * @param folder
 *            The {@link FoldFunction} that will be called for every element
 *            of the input values with the same key.
 * @param initialValue
 *            The initialValue passed to the folders for each key.
 * @return The transformed DataStream.
 *
 * @deprecated will be removed in a future version
 */
@Deprecated
public <R> SingleOutputStreamOperator<R> fold(R initialValue, FoldFunction<T, R> folder) {
  TypeInformation<R> outType = TypeExtractor.getFoldReturnTypes(
      clean(folder), getType(), Utils.getCallLocationName(), true);
  return transform("Keyed Fold", outType, new StreamGroupedFold<>(clean(folder), initialValue));
}

代码示例来源:origin: apache/flink

private <R> SingleOutputStreamOperator<R> apply(InternalWindowFunction<Iterable<T>, R, K, W> function, TypeInformation<R> resultType, Function originalFunction) {
  KeySelector<T, K> keySel = input.getKeySelector();
    @SuppressWarnings({"unchecked", "rawtypes"})
    TypeSerializer<StreamRecord<T>> streamRecordSerializer =
        (TypeSerializer<StreamRecord<T>>) new StreamElementSerializer(input.getType().createSerializer(getExecutionEnvironment().getConfig()));
        windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
        keySel,
        input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
        stateDesc,
        function,
      input.getType().createSerializer(getExecutionEnvironment().getConfig()));
        windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
        keySel,
        input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
        stateDesc,
        function,
  return input.transform(opName, resultType, operator);

代码示例来源:origin: apache/flink

.flatMap(new LegacyCheckpointedFlatMap()).startNewChain().uid("LegacyCheckpointedFlatMap")
.keyBy(0)
.flatMap(new LegacyCheckpointedFlatMapWithKeyedState()).startNewChain().uid("LegacyCheckpointedFlatMapWithKeyedState")
.keyBy(0)
.flatMap(new KeyedStateSettingFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap")
.keyBy(0)
.transform(
  "custom_operator",
  new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(),
  new CheckpointedUdfOperator(new LegacyCheckpointedFlatMapWithKeyedState())).uid("LegacyCheckpointedOperator")
.keyBy(0)
.transform(
  "timely_stateful_operator",
  new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(),

代码示例来源:origin: apache/flink

/**
 * Windows this {@code KeyedStream} into tumbling time windows.
 *
 * <p>This is a shortcut for either {@code .window(TumblingEventTimeWindows.of(size))} or
 * {@code .window(TumblingProcessingTimeWindows.of(size))} depending on the time characteristic
 * set using
 * {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic)}
 *
 * @param size The size of the window.
 */
public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size) {
  if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
    return window(TumblingProcessingTimeWindows.of(size));
  } else {
    return window(TumblingEventTimeWindows.of(size));
  }
}

代码示例来源:origin: apache/flink

.process(new ProcessFunction<Integer, Integer>() {
  private static final long serialVersionUID = 1L;

代码示例来源:origin: apache/flink

@Test
public void testNestedPojoFieldAccessor() throws Exception {
  StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
  see.getConfig().disableObjectReuse();
  see.setParallelism(4);
  DataStream<Data> dataStream = see.fromCollection(elements);
  DataStream<Data> summedStream = dataStream
    .keyBy("aaa")
    .sum("stats.count")
    .keyBy("aaa")
    .flatMap(new FlatMapFunction<Data, Data>() {
      Data[] first = new Data[3];
      @Override
      public void flatMap(Data value, Collector<Data> out) throws Exception {
        if (first[value.aaa] == null) {
          first[value.aaa] = value;
          if (value.stats.count != 123) {
            throw new RuntimeException("Expected stats.count to be 123");
          }
        } else {
          if (value.stats.count != 2 * 123) {
            throw new RuntimeException("Expected stats.count to be 2 * 123");
          }
        }
      }
    });
  summedStream.print();
  see.execute();
}

代码示例来源:origin: apache/flink

public static void main(String[] args) throws Exception {
  final ParameterTool pt = ParameterTool.fromArgs(args);
  final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  setupEnvironment(env, pt);
  KeyedStream<Event, Integer> source = env.addSource(createEventSource(pt))
    .name("EventSource")
    .uid("EventSource")
    .assignTimestampsAndWatermarks(createTimestampExtractor(pt))
    .keyBy(Event::getKey);
  List<TypeSerializer<ComplexPayload>> stateSer =
    Collections.singletonList(new KryoSerializer<>(ComplexPayload.class, env.getConfig()));
  KeyedStream<Event, Integer> afterStatefulOperations = isOriginalJobVariant(pt) ?
    applyOriginalStatefulOperations(source, stateSer, Collections.emptyList()) :
    applyUpgradedStatefulOperations(source, stateSer, Collections.emptyList());
  afterStatefulOperations
    .flatMap(createSemanticsCheckMapper(pt))
    .name("SemanticsCheckMapper")
    .addSink(new PrintSinkFunction<>());
  env.execute("General purpose test job");
}

代码示例来源:origin: apache/flink

Preconditions.checkNotNull(outputType);
final ProcessJoinFunction<IN1, IN2, OUT> cleanedUdf = left.getExecutionEnvironment().clean(processJoinFunction);
    lowerBoundInclusive,
    upperBoundInclusive,
    left.getType().createSerializer(left.getExecutionConfig()),
    right.getType().createSerializer(right.getExecutionConfig()),
    cleanedUdf
  );
  .connect(right)
  .keyBy(keySelector1, keySelector2)
  .transform("Interval Join", outputType, operator);

代码示例来源:origin: apache/flink

.process(new Tokenizer());
.window(TumblingEventTimeWindows.of(Time.seconds(5)))

代码示例来源:origin: apache/flink

/**
 * Applies the given {@link KeyedProcessFunction} on the input stream, thereby creating a transformed output stream.
 *
 * <p>The function will be called for every element in the input streams and can produce zero
 * or more output elements. Contrary to the {@link DataStream#flatMap(FlatMapFunction)}
 * function, this function can also query the time and set timers. When reacting to the firing
 * of set timers the function can directly emit elements and/or register yet more timers.
 *
 * @param keyedProcessFunction The {@link KeyedProcessFunction} that is called for each element in the stream.
 *
 * @param outputType {@link TypeInformation} for the result type of the function.
 *
 * @param <R> The type of elements emitted by the {@code KeyedProcessFunction}.
 *
 * @return The transformed {@link DataStream}.
 */
@Internal
public <R> SingleOutputStreamOperator<R> process(
    KeyedProcessFunction<KEY, T, R> keyedProcessFunction,
    TypeInformation<R> outputType) {
  KeyedProcessOperator<KEY, T, R> operator = new KeyedProcessOperator<>(clean(keyedProcessFunction));
  return transform("KeyedProcess", outputType, operator);
}

代码示例来源:origin: apache/flink

KeyedStream<IN2, ?> keyedInput2 = (KeyedStream<IN2, ?>) inputStream2;
TypeInformation<?> keyType1 = keyedInput1.getKeyType();
TypeInformation<?> keyType2 = keyedInput2.getKeyType();
if (!(keyType1.canEqual(keyType2) && keyType1.equals(keyType2))) {
  throw new UnsupportedOperationException("Key types if input KeyedStreams " +
transform.setStateKeySelectors(keyedInput1.getKeySelector(), keyedInput2.getKeySelector());
transform.setStateKeyType(keyType1);

代码示例来源:origin: apache/flink

public StreamExecutionEnvironment getExecutionEnvironment() {
  return input.getExecutionEnvironment();
}

代码示例来源:origin: apache/flink

1,
  TypeExtractor.NO_INDEX,
  getType(),
  Utils.getCallLocationName(),
  true);
return process(processFunction, outType);

代码示例来源:origin: apache/flink

.connect(npBroadcastStream)
.process(firstBroadcastFunction).uid("BrProcess1")
.addSink(new MigrationTestUtils.AccumulatorCountingSink<>());
.connect(pBroadcastStream)
.process(secondBroadcastFunction).uid("BrProcess2")
.addSink(new MigrationTestUtils.AccumulatorCountingSink<>());

代码示例来源:origin: apache/flink

.map(new OnceFailingPartitionedSum(failurePos))
.keyBy(0)
.addSink(new CounterSink());

代码示例来源:origin: apache/flink

KeyedStream<IN, K> keyedStream = (KeyedStream<IN, K>) inputStream;
patternStream = keyedStream.transform(
  "CepOperator",
  outTypeInfo,
KeySelector<IN, Byte> keySelector = new NullByteKeySelector<>();
patternStream = inputStream.keyBy(keySelector).transform(
  "GlobalCepOperator",
  outTypeInfo,

相关文章