
x33g5p2x  于2022-01-17 转载在 其他  



[英]A AllWindowedStream represents a data stream where the stream of elements is split into windows based on a org.apache.flink.streaming.api.windowing.assigners.WindowAssigner. Window emission is triggered based on a org.apache.flink.streaming.api.windowing.triggers.Trigger.

If an org.apache.flink.streaming.api.windowing.evictors.Evictor is specified it will be used to evict elements from the window after evaluation was triggered by the Trigger but before the actual evaluation of the window. When using an evictor, window performance will degrade significantly, since pre-aggregation of window results cannot be used.

Note that the AllWindowedStream is purely an API construct, during runtime the AllWindowedStream will be collapsed together with the operation over the window into one single operation.


代码示例来源:origin: apache/flink

 * Applies the given window function to each window. The window function is called for each
 * evaluation of the window. The output of the window function is
 * interpreted as a regular non-windowed stream.
 * <p>Not that this function requires that all data in the windows is buffered until the window
 * is evaluated, as the function provides no means of incremental aggregation.
 * @param function The window function.
 * @return The data stream that is the result of applying the window function to the window.
public <R> SingleOutputStreamOperator<R> apply(AllWindowFunction<T, R, W> function) {
  String callLocation = Utils.getCallLocationName();
  function = input.getExecutionEnvironment().clean(function);
  TypeInformation<R> resultType = getAllWindowFunctionReturnType(function, getInputType());
  return apply(new InternalIterableAllWindowFunction<>(function), resultType, callLocation);

代码示例来源:origin: apache/flink

private SingleOutputStreamOperator<T> aggregate(AggregationFunction<T> aggregator) {
  return reduce(aggregator);

代码示例来源:origin: apache/flink

 * Applies the given fold function to each window. The window function is called for each
 * evaluation of the window for each key individually. The output of the reduce function is
 * interpreted as a regular non-windowed stream.
 * @param function The fold function.
 * @return The data stream that is the result of applying the fold function to the window.
 * @deprecated use {@link #aggregate(AggregateFunction, TypeInformation, TypeInformation)} instead
public <R> SingleOutputStreamOperator<R> fold(R initialValue, FoldFunction<T, R> function, TypeInformation<R> resultType) {
  if (function instanceof RichFunction) {
    throw new UnsupportedOperationException("FoldFunction of fold can not be a RichFunction. " +
        "Please use fold(FoldFunction, WindowFunction) instead.");
  return fold(initialValue, function, new PassThroughAllWindowFunction<W, R>(), resultType, resultType);

代码示例来源:origin: apache/flink

 * Applies the given window function to each window. The window function is called for each
 * evaluation of the window for each key individually. The output of the window function is
 * interpreted as a regular non-windowed stream.
 * <p>Arriving data is incrementally aggregated using the given reducer.
 * @param reduceFunction The reduce function that is used for incremental aggregation.
 * @param function The window function.
 * @return The data stream that is the result of applying the window function to the window.
 * @deprecated Use {@link #reduce(ReduceFunction, AllWindowFunction)} instead.
public <R> SingleOutputStreamOperator<R> apply(ReduceFunction<T> reduceFunction, AllWindowFunction<T, R, W> function) {
  TypeInformation<T> inType = input.getType();
  TypeInformation<R> resultType = getAllWindowFunctionReturnType(function, inType);
  return apply(reduceFunction, function, resultType);

代码示例来源:origin: apache/flink

 * Applies the given window function to each window. The window function is called for each
 * evaluation of the window for each key individually. The output of the window function is
 * interpreted as a regular non-windowed stream.
 * <p>Arriving data is incrementally aggregated using the given reducer.
 * @param reduceFunction The reduce function that is used for incremental aggregation.
 * @param function The process window function.
 * @return The data stream that is the result of applying the window function to the window.
public <R> SingleOutputStreamOperator<R> reduce(
    ReduceFunction<T> reduceFunction,
    ProcessAllWindowFunction<T, R, W> function) {
  TypeInformation<R> resultType = getProcessAllWindowFunctionReturnType(function, input.getType());
  return reduce(reduceFunction, function, resultType);

代码示例来源:origin: apache/flink

 * Applies the given window function to each window. The window function is called for each
 * evaluation of the window for each key individually. The output of the window function is
 * interpreted as a regular non-windowed stream.
 * <p>Arriving data is incrementally aggregated using the given reducer.
 * @param reduceFunction The reduce function that is used for incremental aggregation.
 * @param function The window function.
 * @return The data stream that is the result of applying the window function to the window.
public <R> SingleOutputStreamOperator<R> reduce(
    ReduceFunction<T> reduceFunction,
    AllWindowFunction<T, R, W> function) {
  TypeInformation<T> inType = input.getType();
  TypeInformation<R> resultType = getAllWindowFunctionReturnType(function, inType);
  return reduce(reduceFunction, function, resultType);

代码示例来源:origin: apache/flink

 * Applies the given window function to each window. The window function is called for each
 * evaluation of the window for each key individually. The output of the window function is
 * interpreted as a regular non-windowed stream.
 * <p>Arriving data is incrementally aggregated using the given fold function.
 * @param initialValue The initial value of the fold.
 * @param foldFunction The fold function that is used for incremental aggregation.
 * @param function The window function.
 * @return The data stream that is the result of applying the window function to the window.
 * @deprecated Use {@link #fold(Object, FoldFunction, AllWindowFunction)} instead.
public <R> SingleOutputStreamOperator<R> apply(R initialValue, FoldFunction<T, R> foldFunction, AllWindowFunction<R, R, W> function) {
  TypeInformation<R> resultType = TypeExtractor.getFoldReturnTypes(foldFunction, input.getType(),
      Utils.getCallLocationName(), true);
  return apply(initialValue, foldFunction, function, resultType);

代码示例来源:origin: apache/flink

 * Applies the given window function to each window. The window function is called for each
 * evaluation of the window. The output of the window function is
 * interpreted as a regular non-windowed stream.
 * <p>Not that this function requires that all data in the windows is buffered until the window
 * is evaluated, as the function provides no means of incremental aggregation.
 * @param function The process window function.
 * @return The data stream that is the result of applying the window function to the window.
public <R> SingleOutputStreamOperator<R> process(ProcessAllWindowFunction<T, R, W> function) {
  String callLocation = Utils.getCallLocationName();
  function = input.getExecutionEnvironment().clean(function);
  TypeInformation<R> resultType = getProcessAllWindowFunctionReturnType(function, getInputType());
  return apply(new InternalIterableProcessAllWindowFunction<>(function), resultType, callLocation);

代码示例来源:origin: apache/flink

.apply(new AllWindowFunction<Tuple2<Integer, String>, String, GlobalWindow>() {
  public void apply(GlobalWindow window,
.fold(new CustomPOJO(), new FoldFunction<String, CustomPOJO>() {
  private static final long serialVersionUID = 1L;

代码示例来源:origin: apache/flink

.evictor(TimeEvictor.of(Time.of(100, TimeUnit.MILLISECONDS)))
.apply(new AllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
  private static final long serialVersionUID = 1L;

代码示例来源:origin: apache/flink

.fold(0L, new FoldFunction<Long, Long>() {
  private static final long serialVersionUID = 1L;

代码示例来源:origin: apache/flink

 * Applies an aggregation that sums every window of the data stream at the
 * given position.
 * @param positionToSum The position in the tuple/array to sum
 * @return The transformed DataStream.
public SingleOutputStreamOperator<T> sum(int positionToSum) {
  return aggregate(new SumAggregator<>(positionToSum, input.getType(), input.getExecutionConfig()));

代码示例来源:origin: apache/flink

public void testReduceWithEvictor() throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
  DummyReducer reducer = new DummyReducer();
  DataStream<Tuple2<String, Integer>> window1 = source
      .windowAll(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
  OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
  OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
  Assert.assertTrue(operator instanceof EvictingWindowOperator);
  EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?> winOperator = (EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?>) operator;
  Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
  Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows);
  Assert.assertTrue(winOperator.getEvictor() instanceof CountEvictor);
  Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
  processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));

代码示例来源:origin: apache/flink

 * Windows this {@code DataStream} into sliding count windows.
 * <p>Note: This operation is inherently non-parallel since all elements have to pass through
 * the same operator instance.
 * @param size The size of the windows in number of elements.
 * @param slide The slide interval in number of elements.
public AllWindowedStream<T, GlobalWindow> countWindowAll(long size, long slide) {
  return windowAll(GlobalWindows.create())

代码示例来源:origin: apache/flink

@SuppressWarnings({"rawtypes", "unchecked"})
public void testFoldWithEvictor() throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
  DataStream<Tuple3<String, String, Integer>> window1 = source
      .windowAll(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
      .fold(new Tuple3<>("", "", 1), new DummyFolder());
  OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
      (OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window1.getTransformation();
  OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator();
  Assert.assertTrue(operator instanceof EvictingWindowOperator);
  EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?> winOperator = (EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?>) operator;
  Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
  Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows);
  Assert.assertTrue(winOperator.getEvictor() instanceof CountEvictor);
  Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
  winOperator.setOutputType((TypeInformation) window1.getType(), new ExecutionConfig());
  processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));

代码示例来源:origin: apache/flink

public void testApplyWithCustomTrigger() throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
  DataStream<Tuple2<String, Integer>> window1 = source
      .windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
      .apply(new AllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
        private static final long serialVersionUID = 1L;
        public void apply(
            TimeWindow window,
            Iterable<Tuple2<String, Integer>> values,
            Collector<Tuple2<String, Integer>> out) throws Exception {
          for (Tuple2<String, Integer> in : values) {
  OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
  OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
  Assert.assertTrue(operator instanceof WindowOperator);
  WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
  Assert.assertTrue(winOperator.getTrigger() instanceof CountTrigger);
  Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
  Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
  processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));

代码示例来源:origin: apache/flink

 * Applies an aggregation that gives the maximum element of every window of
 * the data stream by the given position. If more elements have the same
 * maximum value the operator returns the first by default.
 * @param positionToMaxBy
 *            The position to maximize by
 * @return The transformed DataStream.
public SingleOutputStreamOperator<T> maxBy(String positionToMaxBy) {
  return this.maxBy(positionToMaxBy, true);

代码示例来源:origin: apache/flink

public void testAggregateWithEvictor() throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
  DataStream<Tuple2<String, Integer>> window1 = source
      .windowAll(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
      .aggregate(new DummyAggregationFunction());
  OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform =
      (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
  OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
  Assert.assertTrue(operator instanceof WindowOperator);
  WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator =
      (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
  Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
  Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows);
  Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
      winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));

代码示例来源:origin: apache/flink

@SuppressWarnings({"unchecked", "rawtypes"})
TypeSerializer<StreamRecord<T>> streamRecordSerializer =
    (TypeSerializer<StreamRecord<T>>) new StreamElementSerializer(input.getType().createSerializer(getExecutionEnvironment().getConfig()));

代码示例来源:origin: apache/flink

.evictor(TimeEvictor.of(Time.of(100, TimeUnit.MILLISECONDS)))
.process(new ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
  private static final long serialVersionUID = 1L;
