org.apache.flink.streaming.api.datastream.AllWindowedStream.aggregate()方法的使用及代码示例

x33g5p2x  于2022-01-17 转载在 其他  
字(17.3k)|赞(0)|评价(0)|浏览(168)

本文整理了Java中org.apache.flink.streaming.api.datastream.AllWindowedStream.aggregate()方法的一些代码示例,展示了AllWindowedStream.aggregate()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。AllWindowedStream.aggregate()方法的具体详情如下:
包路径:org.apache.flink.streaming.api.datastream.AllWindowedStream
类名称:AllWindowedStream
方法名:aggregate

AllWindowedStream.aggregate介绍

[英]Applies the given AggregateFunction to each window. The AggregateFunction aggregates all elements of a window into a single result element. The stream of these result elements (one per window) is interpreted as a regular non-windowed stream.
[中]将给定的AggregateFunction应用于每个窗口。AggregateFunction将窗口的所有元素聚合到单个结果元素中。这些结果元素的流(每个窗口一个)被解释为常规的非窗口流。

代码示例

代码示例来源:origin: apache/flink

/**
 * Applies an aggregation that sums every window of the data stream at the
 * given position.
 *
 * @param positionToSum The position in the tuple/array to sum
 * @return The transformed DataStream.
 */
public SingleOutputStreamOperator<T> sum(int positionToSum) {
  return aggregate(new SumAggregator<>(positionToSum, input.getType(), input.getExecutionConfig()));
}

代码示例来源:origin: apache/flink

/**
 * Applies an aggregation that that gives the minimum value of every window
 * of the data stream at the given position.
 *
 * @param positionToMin The position to minimize
 * @return The transformed DataStream.
 */
public SingleOutputStreamOperator<T> min(int positionToMin) {
  return aggregate(new ComparableAggregator<>(positionToMin, input.getType(), AggregationFunction.AggregationType.MIN, input.getExecutionConfig()));
}

代码示例来源:origin: apache/flink

/**
 * Applies an aggregation that gives the maximum value of every window of
 * the data stream at the given position.
 *
 * @param positionToMax The position to maximize
 * @return The transformed DataStream.
 */
public SingleOutputStreamOperator<T> max(int positionToMax) {
  return aggregate(new ComparableAggregator<>(positionToMax, input.getType(), AggregationFunction.AggregationType.MAX, input.getExecutionConfig()));
}

代码示例来源:origin: apache/flink

/**
 * Applies an aggregation that that gives the maximum value of the pojo data
 * stream at the given field expression for every window. A field expression
 * is either the name of a public field or a getter method with parentheses
 * of the {@link DataStream DataStreams} underlying type. A dot can be used to drill
 * down into objects, as in {@code "field1.getInnerField2()" }.
 *
 * @param field The field expression based on which the aggregation will be applied.
 * @return The transformed DataStream.
 */
public SingleOutputStreamOperator<T> max(String field) {
  return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MAX, false, input.getExecutionConfig()));
}

代码示例来源:origin: apache/flink

/**
 * Applies an aggregation that gives the minimum element of every window of
 * the data stream by the given position. If more elements have the same
 * minimum value the operator returns either the first or last one depending
 * on the parameter setting.
 *
 * @param positionToMinBy The position to minimize
 * @param first If true, then the operator return the first element with the minimum value, otherwise returns the last
 * @return The transformed DataStream.
 */
public SingleOutputStreamOperator<T> minBy(int positionToMinBy, boolean first) {
  return aggregate(new ComparableAggregator<>(positionToMinBy, input.getType(), AggregationFunction.AggregationType.MINBY, first, input.getExecutionConfig()));
}

代码示例来源:origin: apache/flink

/**
 * Applies an aggregation that gives the maximum element of every window of
 * the data stream by the given position. If more elements have the same
 * maximum value the operator returns either the first or last one depending
 * on the parameter setting.
 *
 * @param positionToMaxBy The position to maximize by
 * @param first If true, then the operator return the first element with the maximum value, otherwise returns the last
 * @return The transformed DataStream.
 */
public SingleOutputStreamOperator<T> maxBy(int positionToMaxBy, boolean first) {
  return aggregate(new ComparableAggregator<>(positionToMaxBy, input.getType(), AggregationFunction.AggregationType.MAXBY, first, input.getExecutionConfig()));
}

代码示例来源:origin: apache/flink

/**
 * Applies an aggregation that sums every window of the pojo data stream at
 * the given field for every window.
 *
 * <p>A field expression is either the name of a public field or a getter method with
 * parentheses of the stream's underlying type. A dot can be used to drill down into objects,
 * as in {@code "field1.getInnerField2()" }.
 *
 * @param field The field to sum
 * @return The transformed DataStream.
 */
public SingleOutputStreamOperator<T> sum(String field) {
  return aggregate(new SumAggregator<>(field, input.getType(), input.getExecutionConfig()));
}

代码示例来源:origin: apache/flink

/**
 * Applies an aggregation that that gives the minimum value of the pojo data
 * stream at the given field expression for every window.
 *
 * <p>A field expression is either the name of a public field or a getter method with
 * parentheses of the {@link DataStream}S underlying type. A dot can be used to drill down into
 * objects, as in {@code "field1.getInnerField2()" }.
 *
 * @param field The field expression based on which the aggregation will be applied.
 * @return The transformed DataStream.
 */
public SingleOutputStreamOperator<T> min(String field) {
  return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MIN, false, input.getExecutionConfig()));
}

代码示例来源:origin: apache/flink

/**
 * Applies an aggregation that that gives the minimum element of the pojo
 * data stream by the given field expression for every window. A field
 * expression is either the name of a public field or a getter method with
 * parentheses of the {@link DataStream DataStreams} underlying type. A dot can be used
 * to drill down into objects, as in {@code "field1.getInnerField2()" }.
 *
 * @param field The field expression based on which the aggregation will be applied.
 * @param first If True then in case of field equality the first object will be returned
 * @return The transformed DataStream.
 */
public SingleOutputStreamOperator<T> minBy(String field, boolean first) {
  return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MINBY, first, input.getExecutionConfig()));
}

代码示例来源:origin: apache/flink

/**
 * Applies an aggregation that that gives the maximum element of the pojo
 * data stream by the given field expression for every window. A field
 * expression is either the name of a public field or a getter method with
 * parentheses of the {@link DataStream}S underlying type. A dot can be used
 * to drill down into objects, as in {@code "field1.getInnerField2()" }.
 *
 * @param field The field expression based on which the aggregation will be applied.
 * @param first If True then in case of field equality the first object will be returned
 * @return The transformed DataStream.
 */
public SingleOutputStreamOperator<T> maxBy(String field, boolean first) {
  return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MAXBY, first, input.getExecutionConfig()));
}

代码示例来源:origin: apache/flink

/**
 * Applies the given {@code AggregateFunction} to each window. The AggregateFunction
 * aggregates all elements of a window into a single result element. The stream of these
 * result elements (one per window) is interpreted as a regular non-windowed stream.
 *
 * @param function The aggregation function.
 * @return The data stream that is the result of applying the aggregation function to the window.
 *
 * @param <ACC> The type of the AggregateFunction's accumulator
 * @param <R> The type of the elements in the resulting stream, equal to the
 *            AggregateFunction's result type
 */
@PublicEvolving
public <ACC, R> SingleOutputStreamOperator<R> aggregate(
    AggregateFunction<T, ACC, R> function,
    TypeInformation<ACC> accumulatorType,
    TypeInformation<R> resultType) {
  checkNotNull(function, "function");
  checkNotNull(accumulatorType, "accumulatorType");
  checkNotNull(resultType, "resultType");
  if (function instanceof RichFunction) {
    throw new UnsupportedOperationException("This aggregation function cannot be a RichFunction.");
  }
  return aggregate(function, new PassThroughAllWindowFunction<W, R>(),
      accumulatorType, resultType);
}

代码示例来源:origin: apache/flink

/**
 * Applies the given {@code AggregateFunction} to each window. The AggregateFunction
 * aggregates all elements of a window into a single result element. The stream of these
 * result elements (one per window) is interpreted as a regular non-windowed stream.
 *
 * @param function The aggregation function.
 * @return The data stream that is the result of applying the fold function to the window.
 *
 * @param <ACC> The type of the AggregateFunction's accumulator
 * @param <R> The type of the elements in the resulting stream, equal to the
 *            AggregateFunction's result type
 */
@PublicEvolving
public <ACC, R> SingleOutputStreamOperator<R> aggregate(AggregateFunction<T, ACC, R> function) {
  checkNotNull(function, "function");
  if (function instanceof RichFunction) {
    throw new UnsupportedOperationException("This aggregation function cannot be a RichFunction.");
  }
  TypeInformation<ACC> accumulatorType = TypeExtractor.getAggregateFunctionAccumulatorType(
      function, input.getType(), null, false);
  TypeInformation<R> resultType = TypeExtractor.getAggregateFunctionReturnType(
      function, input.getType(), null, false);
  return aggregate(function, accumulatorType, resultType);
}

代码示例来源:origin: apache/flink

return aggregate(aggFunction, windowFunction, accumulatorType, aggResultType, resultType);

代码示例来源:origin: apache/flink

return aggregate(aggFunction, windowFunction, accumulatorType, resultType);

代码示例来源:origin: apache/flink

/**
 * .aggregate() does not support RichAggregateFunction, since the AggregateFunction is used internally
 * in an {@code AggregatingState}.
 */
@Test(expected = UnsupportedOperationException.class)
public void testAggregateWithRichFunctionFails() throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
  env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
  source
      .windowAll(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
      .aggregate(new DummyRichAggregationFunction<Tuple2<String, Integer>>());
  fail("exception was not thrown");
}

代码示例来源:origin: apache/flink

@Test
public void testAggregateEventTime() throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
  DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
  DataStream<Tuple2<String, Integer>> window1 = source
      .windowAll(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
      .aggregate(new DummyAggregationFunction());
  OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform =
      (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
  OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
  Assert.assertTrue(operator instanceof WindowOperator);
  WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator =
      (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
  Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
  Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows);
  Assert.assertTrue(winOperator.getStateDescriptor() instanceof AggregatingStateDescriptor);
  processElementAndEnsureOutput(
      winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}

代码示例来源:origin: apache/flink

@Test
public void testAggregateProcessingTime() throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
  DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
  DataStream<Tuple2<String, Integer>> window1 = source
      .windowAll(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
      .aggregate(new DummyAggregationFunction());
  OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform =
      (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
  OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
  Assert.assertTrue(operator instanceof WindowOperator);
  WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator =
      (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
  Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
  Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingProcessingTimeWindows);
  Assert.assertTrue(winOperator.getStateDescriptor() instanceof AggregatingStateDescriptor);
  processElementAndEnsureOutput(
      winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}

代码示例来源:origin: apache/flink

@Test
public void testAggregateWithWindowFunctionEventTime() throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
  DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
  DataStream<Tuple3<String, String, Integer>> window = source
      .windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
      .aggregate(new DummyAggregationFunction(), new TestAllWindowFunction());
  OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
      (OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation();
  OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator();
  Assert.assertTrue(operator instanceof WindowOperator);
  WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator =
      (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
  Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
  Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
  Assert.assertTrue(winOperator.getStateDescriptor() instanceof AggregatingStateDescriptor);
  processElementAndEnsureOutput(
      operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}

代码示例来源:origin: apache/flink

@Test
public void testAggregateWithWindowFunctionProcessingTime() throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
  DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
  DataStream<Tuple3<String, String, Integer>> window = source
      .windowAll(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
      .aggregate(new DummyAggregationFunction(), new TestAllWindowFunction());
  OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
      (OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation();
  OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator();
  Assert.assertTrue(operator instanceof WindowOperator);
  WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator =
      (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
  Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
  Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
  Assert.assertTrue(winOperator.getStateDescriptor() instanceof AggregatingStateDescriptor);
  processElementAndEnsureOutput(
      operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}

代码示例来源:origin: apache/flink

@Test
public void testAggregateWithEvictor() throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
  DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
  DataStream<Tuple2<String, Integer>> window1 = source
      .windowAll(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
      .evictor(CountEvictor.of(100))
      .aggregate(new DummyAggregationFunction());
  OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform =
      (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
  OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
  Assert.assertTrue(operator instanceof WindowOperator);
  WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator =
      (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
  Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
  Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows);
  Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
  processElementAndEnsureOutput(
      winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}

相关文章