本文整理了Java中org.apache.flink.streaming.api.datastream.AllWindowedStream.aggregate()
方法的一些代码示例,展示了AllWindowedStream.aggregate()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。AllWindowedStream.aggregate()
方法的具体详情如下:
包路径:org.apache.flink.streaming.api.datastream.AllWindowedStream
类名称:AllWindowedStream
方法名:aggregate
[英]Applies the given AggregateFunction to each window. The AggregateFunction aggregates all elements of a window into a single result element. The stream of these result elements (one per window) is interpreted as a regular non-windowed stream.
[中]将给定的AggregateFunction应用于每个窗口。AggregateFunction将窗口的所有元素聚合到单个结果元素中。这些结果元素的流(每个窗口一个)被解释为常规的非窗口流。
代码示例来源:origin: apache/flink
/**
* Applies an aggregation that sums every window of the data stream at the
* given position.
*
* @param positionToSum The position in the tuple/array to sum
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<T> sum(int positionToSum) {
return aggregate(new SumAggregator<>(positionToSum, input.getType(), input.getExecutionConfig()));
}
代码示例来源:origin: apache/flink
/**
* Applies an aggregation that that gives the minimum value of every window
* of the data stream at the given position.
*
* @param positionToMin The position to minimize
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<T> min(int positionToMin) {
return aggregate(new ComparableAggregator<>(positionToMin, input.getType(), AggregationFunction.AggregationType.MIN, input.getExecutionConfig()));
}
代码示例来源:origin: apache/flink
/**
* Applies an aggregation that gives the maximum value of every window of
* the data stream at the given position.
*
* @param positionToMax The position to maximize
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<T> max(int positionToMax) {
return aggregate(new ComparableAggregator<>(positionToMax, input.getType(), AggregationFunction.AggregationType.MAX, input.getExecutionConfig()));
}
代码示例来源:origin: apache/flink
/**
* Applies an aggregation that that gives the maximum value of the pojo data
* stream at the given field expression for every window. A field expression
* is either the name of a public field or a getter method with parentheses
* of the {@link DataStream DataStreams} underlying type. A dot can be used to drill
* down into objects, as in {@code "field1.getInnerField2()" }.
*
* @param field The field expression based on which the aggregation will be applied.
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<T> max(String field) {
return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MAX, false, input.getExecutionConfig()));
}
代码示例来源:origin: apache/flink
/**
* Applies an aggregation that gives the minimum element of every window of
* the data stream by the given position. If more elements have the same
* minimum value the operator returns either the first or last one depending
* on the parameter setting.
*
* @param positionToMinBy The position to minimize
* @param first If true, then the operator return the first element with the minimum value, otherwise returns the last
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<T> minBy(int positionToMinBy, boolean first) {
return aggregate(new ComparableAggregator<>(positionToMinBy, input.getType(), AggregationFunction.AggregationType.MINBY, first, input.getExecutionConfig()));
}
代码示例来源:origin: apache/flink
/**
* Applies an aggregation that gives the maximum element of every window of
* the data stream by the given position. If more elements have the same
* maximum value the operator returns either the first or last one depending
* on the parameter setting.
*
* @param positionToMaxBy The position to maximize by
* @param first If true, then the operator return the first element with the maximum value, otherwise returns the last
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<T> maxBy(int positionToMaxBy, boolean first) {
return aggregate(new ComparableAggregator<>(positionToMaxBy, input.getType(), AggregationFunction.AggregationType.MAXBY, first, input.getExecutionConfig()));
}
代码示例来源:origin: apache/flink
/**
* Applies an aggregation that sums every window of the pojo data stream at
* the given field for every window.
*
* <p>A field expression is either the name of a public field or a getter method with
* parentheses of the stream's underlying type. A dot can be used to drill down into objects,
* as in {@code "field1.getInnerField2()" }.
*
* @param field The field to sum
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<T> sum(String field) {
return aggregate(new SumAggregator<>(field, input.getType(), input.getExecutionConfig()));
}
代码示例来源:origin: apache/flink
/**
* Applies an aggregation that that gives the minimum value of the pojo data
* stream at the given field expression for every window.
*
* <p>A field expression is either the name of a public field or a getter method with
* parentheses of the {@link DataStream}S underlying type. A dot can be used to drill down into
* objects, as in {@code "field1.getInnerField2()" }.
*
* @param field The field expression based on which the aggregation will be applied.
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<T> min(String field) {
return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MIN, false, input.getExecutionConfig()));
}
代码示例来源:origin: apache/flink
/**
* Applies an aggregation that that gives the minimum element of the pojo
* data stream by the given field expression for every window. A field
* expression is either the name of a public field or a getter method with
* parentheses of the {@link DataStream DataStreams} underlying type. A dot can be used
* to drill down into objects, as in {@code "field1.getInnerField2()" }.
*
* @param field The field expression based on which the aggregation will be applied.
* @param first If True then in case of field equality the first object will be returned
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<T> minBy(String field, boolean first) {
return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MINBY, first, input.getExecutionConfig()));
}
代码示例来源:origin: apache/flink
/**
* Applies an aggregation that that gives the maximum element of the pojo
* data stream by the given field expression for every window. A field
* expression is either the name of a public field or a getter method with
* parentheses of the {@link DataStream}S underlying type. A dot can be used
* to drill down into objects, as in {@code "field1.getInnerField2()" }.
*
* @param field The field expression based on which the aggregation will be applied.
* @param first If True then in case of field equality the first object will be returned
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<T> maxBy(String field, boolean first) {
return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MAXBY, first, input.getExecutionConfig()));
}
代码示例来源:origin: apache/flink
/**
* Applies the given {@code AggregateFunction} to each window. The AggregateFunction
* aggregates all elements of a window into a single result element. The stream of these
* result elements (one per window) is interpreted as a regular non-windowed stream.
*
* @param function The aggregation function.
* @return The data stream that is the result of applying the aggregation function to the window.
*
* @param <ACC> The type of the AggregateFunction's accumulator
* @param <R> The type of the elements in the resulting stream, equal to the
* AggregateFunction's result type
*/
@PublicEvolving
public <ACC, R> SingleOutputStreamOperator<R> aggregate(
AggregateFunction<T, ACC, R> function,
TypeInformation<ACC> accumulatorType,
TypeInformation<R> resultType) {
checkNotNull(function, "function");
checkNotNull(accumulatorType, "accumulatorType");
checkNotNull(resultType, "resultType");
if (function instanceof RichFunction) {
throw new UnsupportedOperationException("This aggregation function cannot be a RichFunction.");
}
return aggregate(function, new PassThroughAllWindowFunction<W, R>(),
accumulatorType, resultType);
}
代码示例来源:origin: apache/flink
/**
* Applies the given {@code AggregateFunction} to each window. The AggregateFunction
* aggregates all elements of a window into a single result element. The stream of these
* result elements (one per window) is interpreted as a regular non-windowed stream.
*
* @param function The aggregation function.
* @return The data stream that is the result of applying the fold function to the window.
*
* @param <ACC> The type of the AggregateFunction's accumulator
* @param <R> The type of the elements in the resulting stream, equal to the
* AggregateFunction's result type
*/
@PublicEvolving
public <ACC, R> SingleOutputStreamOperator<R> aggregate(AggregateFunction<T, ACC, R> function) {
checkNotNull(function, "function");
if (function instanceof RichFunction) {
throw new UnsupportedOperationException("This aggregation function cannot be a RichFunction.");
}
TypeInformation<ACC> accumulatorType = TypeExtractor.getAggregateFunctionAccumulatorType(
function, input.getType(), null, false);
TypeInformation<R> resultType = TypeExtractor.getAggregateFunctionReturnType(
function, input.getType(), null, false);
return aggregate(function, accumulatorType, resultType);
}
代码示例来源:origin: apache/flink
return aggregate(aggFunction, windowFunction, accumulatorType, aggResultType, resultType);
代码示例来源:origin: apache/flink
return aggregate(aggFunction, windowFunction, accumulatorType, resultType);
代码示例来源:origin: apache/flink
/**
* .aggregate() does not support RichAggregateFunction, since the AggregateFunction is used internally
* in an {@code AggregatingState}.
*/
@Test(expected = UnsupportedOperationException.class)
public void testAggregateWithRichFunctionFails() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
source
.windowAll(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
.aggregate(new DummyRichAggregationFunction<Tuple2<String, Integer>>());
fail("exception was not thrown");
}
代码示例来源:origin: apache/flink
@Test
public void testAggregateEventTime() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
DataStream<Tuple2<String, Integer>> window1 = source
.windowAll(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
.aggregate(new DummyAggregationFunction());
OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform =
(OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
Assert.assertTrue(operator instanceof WindowOperator);
WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator =
(WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows);
Assert.assertTrue(winOperator.getStateDescriptor() instanceof AggregatingStateDescriptor);
processElementAndEnsureOutput(
winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}
代码示例来源:origin: apache/flink
@Test
public void testAggregateProcessingTime() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
DataStream<Tuple2<String, Integer>> window1 = source
.windowAll(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
.aggregate(new DummyAggregationFunction());
OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform =
(OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
Assert.assertTrue(operator instanceof WindowOperator);
WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator =
(WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingProcessingTimeWindows);
Assert.assertTrue(winOperator.getStateDescriptor() instanceof AggregatingStateDescriptor);
processElementAndEnsureOutput(
winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}
代码示例来源:origin: apache/flink
@Test
public void testAggregateWithWindowFunctionEventTime() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
DataStream<Tuple3<String, String, Integer>> window = source
.windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
.aggregate(new DummyAggregationFunction(), new TestAllWindowFunction());
OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
(OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation();
OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator();
Assert.assertTrue(operator instanceof WindowOperator);
WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator =
(WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
Assert.assertTrue(winOperator.getStateDescriptor() instanceof AggregatingStateDescriptor);
processElementAndEnsureOutput(
operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}
代码示例来源:origin: apache/flink
@Test
public void testAggregateWithWindowFunctionProcessingTime() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
DataStream<Tuple3<String, String, Integer>> window = source
.windowAll(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
.aggregate(new DummyAggregationFunction(), new TestAllWindowFunction());
OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
(OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation();
OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator();
Assert.assertTrue(operator instanceof WindowOperator);
WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator =
(WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
Assert.assertTrue(winOperator.getStateDescriptor() instanceof AggregatingStateDescriptor);
processElementAndEnsureOutput(
operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}
代码示例来源:origin: apache/flink
@Test
public void testAggregateWithEvictor() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
DataStream<Tuple2<String, Integer>> window1 = source
.windowAll(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
.evictor(CountEvictor.of(100))
.aggregate(new DummyAggregationFunction());
OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform =
(OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
Assert.assertTrue(operator instanceof WindowOperator);
WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator =
(WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows);
Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
processElementAndEnsureOutput(
winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}
内容来源于网络,如有侵权,请联系作者删除!