org.apache.flink.streaming.api.datastream.AllWindowedStream类的使用及代码示例

x33g5p2x  于2022-01-17 转载在 其他  
字(17.3k)|赞(0)|评价(0)|浏览(200)

本文整理了Java中org.apache.flink.streaming.api.datastream.AllWindowedStream类的一些代码示例,展示了AllWindowedStream类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。AllWindowedStream类的具体详情如下:
包路径:org.apache.flink.streaming.api.datastream.AllWindowedStream
类名称:AllWindowedStream

AllWindowedStream介绍

[英]A AllWindowedStream represents a data stream where the stream of elements is split into windows based on a org.apache.flink.streaming.api.windowing.assigners.WindowAssigner. Window emission is triggered based on a org.apache.flink.streaming.api.windowing.triggers.Trigger.

If an org.apache.flink.streaming.api.windowing.evictors.Evictor is specified it will be used to evict elements from the window after evaluation was triggered by the Trigger but before the actual evaluation of the window. When using an evictor, window performance will degrade significantly, since pre-aggregation of window results cannot be used.

Note that the AllWindowedStream is purely an API construct, during runtime the AllWindowedStream will be collapsed together with the operation over the window into one single operation.
[中]AllWindowedStream表示一个数据流,其中元素流根据组织划分为多个窗口。阿帕奇。弗林克。流动。应用程序编程接口。开窗。转让人。窗口分配程序。窗口发射是基于组织触发的。阿帕奇。弗林克。流动。应用程序编程接口。开窗。触发器。触发
如果是一个组织。阿帕奇。弗林克。流动。应用程序编程接口。开窗。驱逐者。如果指定了“逐出器”,则它将用于在触发器触发求值后但在实际求值窗口之前从窗口逐出元素。使用逐出器时,窗口性能将显著降低,因为无法使用窗口结果的预聚合。
请注意,AllWindowedStream纯粹是一个API构造,在运行期间,AllWindowedStream将与窗口上的操作一起折叠为一个操作。

代码示例

代码示例来源:origin: apache/flink

/**
 * Applies the given window function to each window. The window function is called for each
 * evaluation of the window. The output of the window function is
 * interpreted as a regular non-windowed stream.
 *
 * <p>Not that this function requires that all data in the windows is buffered until the window
 * is evaluated, as the function provides no means of incremental aggregation.
 *
 * @param function The window function.
 * @return The data stream that is the result of applying the window function to the window.
 */
public <R> SingleOutputStreamOperator<R> apply(AllWindowFunction<T, R, W> function) {
  String callLocation = Utils.getCallLocationName();
  function = input.getExecutionEnvironment().clean(function);
  TypeInformation<R> resultType = getAllWindowFunctionReturnType(function, getInputType());
  return apply(new InternalIterableAllWindowFunction<>(function), resultType, callLocation);
}

代码示例来源:origin: apache/flink

private SingleOutputStreamOperator<T> aggregate(AggregationFunction<T> aggregator) {
  return reduce(aggregator);
}

代码示例来源:origin: apache/flink

/**
 * Applies the given fold function to each window. The window function is called for each
 * evaluation of the window for each key individually. The output of the reduce function is
 * interpreted as a regular non-windowed stream.
 *
 * @param function The fold function.
 * @return The data stream that is the result of applying the fold function to the window.
 *
 * @deprecated use {@link #aggregate(AggregateFunction, TypeInformation, TypeInformation)} instead
 */
@Deprecated
public <R> SingleOutputStreamOperator<R> fold(R initialValue, FoldFunction<T, R> function, TypeInformation<R> resultType) {
  if (function instanceof RichFunction) {
    throw new UnsupportedOperationException("FoldFunction of fold can not be a RichFunction. " +
        "Please use fold(FoldFunction, WindowFunction) instead.");
  }
  return fold(initialValue, function, new PassThroughAllWindowFunction<W, R>(), resultType, resultType);
}

代码示例来源:origin: apache/flink

/**
 * Applies the given window function to each window. The window function is called for each
 * evaluation of the window for each key individually. The output of the window function is
 * interpreted as a regular non-windowed stream.
 *
 * <p>Arriving data is incrementally aggregated using the given reducer.
 *
 * @param reduceFunction The reduce function that is used for incremental aggregation.
 * @param function The window function.
 * @return The data stream that is the result of applying the window function to the window.
 *
 * @deprecated Use {@link #reduce(ReduceFunction, AllWindowFunction)} instead.
 */
@Deprecated
public <R> SingleOutputStreamOperator<R> apply(ReduceFunction<T> reduceFunction, AllWindowFunction<T, R, W> function) {
  TypeInformation<T> inType = input.getType();
  TypeInformation<R> resultType = getAllWindowFunctionReturnType(function, inType);
  return apply(reduceFunction, function, resultType);
}

代码示例来源:origin: apache/flink

/**
 * Applies the given window function to each window. The window function is called for each
 * evaluation of the window for each key individually. The output of the window function is
 * interpreted as a regular non-windowed stream.
 *
 * <p>Arriving data is incrementally aggregated using the given reducer.
 *
 * @param reduceFunction The reduce function that is used for incremental aggregation.
 * @param function The process window function.
 * @return The data stream that is the result of applying the window function to the window.
 */
@PublicEvolving
public <R> SingleOutputStreamOperator<R> reduce(
    ReduceFunction<T> reduceFunction,
    ProcessAllWindowFunction<T, R, W> function) {
  TypeInformation<R> resultType = getProcessAllWindowFunctionReturnType(function, input.getType());
  return reduce(reduceFunction, function, resultType);
}

代码示例来源:origin: apache/flink

/**
 * Applies the given window function to each window. The window function is called for each
 * evaluation of the window for each key individually. The output of the window function is
 * interpreted as a regular non-windowed stream.
 *
 * <p>Arriving data is incrementally aggregated using the given reducer.
 *
 * @param reduceFunction The reduce function that is used for incremental aggregation.
 * @param function The window function.
 * @return The data stream that is the result of applying the window function to the window.
 */
@PublicEvolving
public <R> SingleOutputStreamOperator<R> reduce(
    ReduceFunction<T> reduceFunction,
    AllWindowFunction<T, R, W> function) {
  TypeInformation<T> inType = input.getType();
  TypeInformation<R> resultType = getAllWindowFunctionReturnType(function, inType);
  return reduce(reduceFunction, function, resultType);
}

代码示例来源:origin: apache/flink

/**
 * Applies the given window function to each window. The window function is called for each
 * evaluation of the window for each key individually. The output of the window function is
 * interpreted as a regular non-windowed stream.
 *
 * <p>Arriving data is incrementally aggregated using the given fold function.
 *
 * @param initialValue The initial value of the fold.
 * @param foldFunction The fold function that is used for incremental aggregation.
 * @param function The window function.
 * @return The data stream that is the result of applying the window function to the window.
 *
 * @deprecated Use {@link #fold(Object, FoldFunction, AllWindowFunction)} instead.
 */
@Deprecated
public <R> SingleOutputStreamOperator<R> apply(R initialValue, FoldFunction<T, R> foldFunction, AllWindowFunction<R, R, W> function) {
  TypeInformation<R> resultType = TypeExtractor.getFoldReturnTypes(foldFunction, input.getType(),
      Utils.getCallLocationName(), true);
  return apply(initialValue, foldFunction, function, resultType);
}

代码示例来源:origin: apache/flink

/**
 * Applies the given window function to each window. The window function is called for each
 * evaluation of the window. The output of the window function is
 * interpreted as a regular non-windowed stream.
 *
 * <p>Not that this function requires that all data in the windows is buffered until the window
 * is evaluated, as the function provides no means of incremental aggregation.
 *
 * @param function The process window function.
 * @return The data stream that is the result of applying the window function to the window.
 */
@PublicEvolving
public <R> SingleOutputStreamOperator<R> process(ProcessAllWindowFunction<T, R, W> function) {
  String callLocation = Utils.getCallLocationName();
  function = input.getExecutionEnvironment().clean(function);
  TypeInformation<R> resultType = getProcessAllWindowFunctionReturnType(function, getInputType());
  return apply(new InternalIterableProcessAllWindowFunction<>(function), resultType, callLocation);
}

代码示例来源:origin: apache/flink

.trigger(PurgingTrigger.of(CountTrigger.of(5)))
.apply(new AllWindowFunction<Tuple2<Integer, String>, String, GlobalWindow>() {
  @Override
  public void apply(GlobalWindow window,
.trigger(PurgingTrigger.of(CountTrigger.of(5)))
.fold(new CustomPOJO(), new FoldFunction<String, CustomPOJO>() {
  private static final long serialVersionUID = 1L;

代码示例来源:origin: apache/flink

.trigger(CountTrigger.of(1))
.evictor(TimeEvictor.of(Time.of(100, TimeUnit.MILLISECONDS)))
.apply(new AllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
  private static final long serialVersionUID = 1L;

代码示例来源:origin: apache/flink

.trigger(PurgingTrigger.of(CountTrigger.of(10)))
.fold(0L, new FoldFunction<Long, Long>() {
  private static final long serialVersionUID = 1L;

代码示例来源:origin: apache/flink

/**
 * Applies an aggregation that sums every window of the data stream at the
 * given position.
 *
 * @param positionToSum The position in the tuple/array to sum
 * @return The transformed DataStream.
 */
public SingleOutputStreamOperator<T> sum(int positionToSum) {
  return aggregate(new SumAggregator<>(positionToSum, input.getType(), input.getExecutionConfig()));
}

代码示例来源:origin: apache/flink

@Test
@SuppressWarnings("rawtypes")
public void testReduceWithEvictor() throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
  DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
  DummyReducer reducer = new DummyReducer();
  DataStream<Tuple2<String, Integer>> window1 = source
      .windowAll(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
      .evictor(CountEvictor.of(100))
      .reduce(reducer);
  OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
  OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
  Assert.assertTrue(operator instanceof EvictingWindowOperator);
  EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?> winOperator = (EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?>) operator;
  Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
  Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows);
  Assert.assertTrue(winOperator.getEvictor() instanceof CountEvictor);
  Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
  processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}

代码示例来源:origin: apache/flink

/**
 * Windows this {@code DataStream} into sliding count windows.
 *
 * <p>Note: This operation is inherently non-parallel since all elements have to pass through
 * the same operator instance.
 *
 * @param size The size of the windows in number of elements.
 * @param slide The slide interval in number of elements.
 */
public AllWindowedStream<T, GlobalWindow> countWindowAll(long size, long slide) {
  return windowAll(GlobalWindows.create())
      .evictor(CountEvictor.of(size))
      .trigger(CountTrigger.of(slide));
}

代码示例来源:origin: apache/flink

@Test
@SuppressWarnings({"rawtypes", "unchecked"})
public void testFoldWithEvictor() throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
  DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
  DataStream<Tuple3<String, String, Integer>> window1 = source
      .windowAll(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
      .evictor(CountEvictor.of(100))
      .fold(new Tuple3<>("", "", 1), new DummyFolder());
  OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
      (OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window1.getTransformation();
  OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator();
  Assert.assertTrue(operator instanceof EvictingWindowOperator);
  EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?> winOperator = (EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?>) operator;
  Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
  Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows);
  Assert.assertTrue(winOperator.getEvictor() instanceof CountEvictor);
  Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
  winOperator.setOutputType((TypeInformation) window1.getType(), new ExecutionConfig());
  processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}

代码示例来源:origin: apache/flink

@Test
@SuppressWarnings("rawtypes")
public void testApplyWithCustomTrigger() throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
  DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
  DataStream<Tuple2<String, Integer>> window1 = source
      .windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
      .trigger(CountTrigger.of(1))
      .apply(new AllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
        private static final long serialVersionUID = 1L;
        @Override
        public void apply(
            TimeWindow window,
            Iterable<Tuple2<String, Integer>> values,
            Collector<Tuple2<String, Integer>> out) throws Exception {
          for (Tuple2<String, Integer> in : values) {
            out.collect(in);
          }
        }
      });
  OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
  OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
  Assert.assertTrue(operator instanceof WindowOperator);
  WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
  Assert.assertTrue(winOperator.getTrigger() instanceof CountTrigger);
  Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
  Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
  processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}

代码示例来源:origin: apache/flink

/**
 * Applies an aggregation that gives the maximum element of every window of
 * the data stream by the given position. If more elements have the same
 * maximum value the operator returns the first by default.
 *
 * @param positionToMaxBy
 *            The position to maximize by
 * @return The transformed DataStream.
 */
public SingleOutputStreamOperator<T> maxBy(String positionToMaxBy) {
  return this.maxBy(positionToMaxBy, true);
}

代码示例来源:origin: apache/flink

@Test
public void testAggregateWithEvictor() throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
  DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
  DataStream<Tuple2<String, Integer>> window1 = source
      .windowAll(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
      .evictor(CountEvictor.of(100))
      .aggregate(new DummyAggregationFunction());
  OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform =
      (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
  OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
  Assert.assertTrue(operator instanceof WindowOperator);
  WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator =
      (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
  Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
  Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows);
  Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
  processElementAndEnsureOutput(
      winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}

代码示例来源:origin: apache/flink

@SuppressWarnings({"unchecked", "rawtypes"})
TypeSerializer<StreamRecord<T>> streamRecordSerializer =
    (TypeSerializer<StreamRecord<T>>) new StreamElementSerializer(input.getType().createSerializer(getExecutionEnvironment().getConfig()));
    windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
    keySel,
    input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
    stateDesc,
    function,
    input.getType().createSerializer(getExecutionEnvironment().getConfig()));
    windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
    keySel,
    input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
    stateDesc,
    function,

代码示例来源:origin: apache/flink

.trigger(CountTrigger.of(1))
.evictor(TimeEvictor.of(Time.of(100, TimeUnit.MILLISECONDS)))
.process(new ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
  private static final long serialVersionUID = 1L;

相关文章