本文整理了Java中org.apache.flink.streaming.api.windowing.time.Time.of()
方法的一些代码示例,展示了Time.of()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Time.of()
方法的具体详情如下:
包路径:org.apache.flink.streaming.api.windowing.time.Time
类名称:Time
方法名:of
[英]Creates a new Time of the given duration and TimeUnit.
The Time refers to the time characteristic that is set on the dataflow via org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic).
[中]创建给定持续时间和时间单位的新时间。
时间是指通过org在数据流上设置的时间特征。阿帕奇。弗林克。流动。应用程序编程接口。环境StreamExecutionEnvironment#setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic)。
代码示例来源:origin: apache/flink
/**
* .aggregate() does not support RichAggregateFunction, since the AggregateFunction is used internally
* in an {@code AggregatingState}.
*/
@Test(expected = UnsupportedOperationException.class)
public void testAggregateWithRichFunctionFails() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
source
.windowAll(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
.aggregate(new DummyRichAggregationFunction<Tuple2<String, Integer>>());
fail("exception was not thrown");
}
代码示例来源:origin: apache/flink
/**
* .reduce() does not support RichReduceFunction, since the reduce function is used internally
* in a {@code ReducingState}.
*/
@Test(expected = UnsupportedOperationException.class)
public void testReduceWithRichReducerFails() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
source
.windowAll(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
.reduce(new RichReduceFunction<Tuple2<String, Integer>>() {
private static final long serialVersionUID = -6448847205314995812L;
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1,
Tuple2<String, Integer> value2) throws Exception {
return null;
}
});
fail("exception was not thrown");
}
代码示例来源:origin: apache/flink
/**
* .aggregate() does not support RichAggregateFunction, since the AggregationFunction is used internally
* in a {@code AggregatingState}.
*/
@Test(expected = UnsupportedOperationException.class)
public void testAggregateWithRichFunctionFails() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
source
.keyBy(0)
.window(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
.aggregate(new DummyRichAggregationFunction<Tuple2<String, Integer>>());
fail("exception was not thrown");
}
代码示例来源:origin: apache/flink
/**
* .reduce() does not support RichReduceFunction, since the reduce function is used internally
* in a {@code ReducingState}.
*/
@Test(expected = UnsupportedOperationException.class)
public void testReduceWithRichReducerFails() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
source
.keyBy(0)
.window(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
.reduce(new RichReduceFunction<Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1,
Tuple2<String, Integer> value2) throws Exception {
return null;
}
});
fail("exception was not thrown");
}
代码示例来源:origin: apache/flink
/**
* .fold() does not support RichFoldFunction, since the fold function is used internally
* in a {@code FoldingState}.
*/
@Test(expected = UnsupportedOperationException.class)
public void testFoldWithRichFolderFails() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
source
.windowAll(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
.fold(new Tuple2<>("", 0), new RichFoldFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
private static final long serialVersionUID = -6448847205314995812L;
@Override
public Tuple2<String, Integer> fold(Tuple2<String, Integer> value1,
Tuple2<String, Integer> value2) throws Exception {
return null;
}
});
fail("exception was not thrown");
}
代码示例来源:origin: apache/flink
/**
* .fold() does not support RichFoldFunction, since the fold function is used internally
* in a {@code FoldingState}.
*/
@Test(expected = UnsupportedOperationException.class)
public void testFoldWithRichFolderFails() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
source
.keyBy(0)
.window(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
.fold(new Tuple2<>("", 0), new RichFoldFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> fold(Tuple2<String, Integer> value1,
Tuple2<String, Integer> value2) throws Exception {
return null;
}
});
fail("exception was not thrown");
}
代码示例来源:origin: apache/flink
@Test
public void testSlidingEventTimeWindowsApply() throws Exception {
closeCalled.set(0);
final int windowSize = 3;
final int windowSlide = 1;
ListStateDescriptor<Tuple2<String, Integer>> stateDesc = new ListStateDescriptor<>("window-contents",
STRING_INT_TUPLE.createSerializer(new ExecutionConfig()));
WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
SlidingEventTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS), Time.of(windowSlide, TimeUnit.SECONDS)),
new TimeWindow.Serializer(),
new TupleKeySelector(),
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
stateDesc,
new InternalIterableWindowFunction<>(new RichSumReducer<TimeWindow>()),
EventTimeTrigger.create(),
0,
null /* late data output tag */);
testSlidingEventTimeWindows(operator);
// we close once in the rest...
Assert.assertEquals("Close was not called.", 2, closeCalled.get());
}
代码示例来源:origin: apache/flink
@Test
@SuppressWarnings("rawtypes")
public void testReduceEventTimeWindows() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
DataStream<Tuple2<String, Integer>> source = env.fromElements(
Tuple2.of("hello", 1),
Tuple2.of("hello", 2));
DataStream<Tuple2<String, Integer>> window1 = source
.keyBy(0)
.timeWindow(Time.of(1000, TimeUnit.MILLISECONDS), Time.of(100, TimeUnit.MILLISECONDS))
.reduce(new DummyReducer());
OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
Assert.assertTrue(operator1 instanceof WindowOperator);
WindowOperator winOperator1 = (WindowOperator) operator1;
Assert.assertTrue(winOperator1.getTrigger() instanceof EventTimeTrigger);
Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingEventTimeWindows);
Assert.assertTrue(winOperator1.getStateDescriptor() instanceof ReducingStateDescriptor);
}
代码示例来源:origin: apache/flink
@Test
@SuppressWarnings("unchecked")
public void testSlidingEventTimeWindowsReduce() throws Exception {
closeCalled.set(0);
final int windowSize = 3;
final int windowSlide = 1;
ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = new ReducingStateDescriptor<>("window-contents",
new SumReducer(),
STRING_INT_TUPLE.createSerializer(new ExecutionConfig()));
WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
SlidingEventTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS), Time.of(windowSlide, TimeUnit.SECONDS)),
new TimeWindow.Serializer(),
new TupleKeySelector(),
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
stateDesc,
new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
EventTimeTrigger.create(),
0,
null /* late data output tag */);
testSlidingEventTimeWindows(operator);
}
代码示例来源:origin: apache/flink
@Test
@SuppressWarnings("rawtypes")
public void testFoldEventTimeWindows() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
DataStream<Tuple2<String, Integer>> source = env.fromElements(
Tuple2.of("hello", 1),
Tuple2.of("hello", 2));
DataStream<Tuple2<String, Integer>> window1 = source
.keyBy(0)
.timeWindow(Time.of(1000, TimeUnit.MILLISECONDS), Time.of(100, TimeUnit.MILLISECONDS))
.fold(new Tuple2<>("", 1), new DummyFolder());
OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
Assert.assertTrue(operator1 instanceof WindowOperator);
WindowOperator winOperator1 = (WindowOperator) operator1;
Assert.assertTrue(winOperator1.getTrigger() instanceof EventTimeTrigger);
Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingEventTimeWindows);
Assert.assertTrue(winOperator1.getStateDescriptor() instanceof FoldingStateDescriptor);
}
代码示例来源:origin: apache/flink
@Test
@SuppressWarnings("rawtypes")
public void testReduceEventTime() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
DataStream<Tuple2<String, Integer>> window1 = source
.windowAll(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
.reduce(new DummyReducer());
OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
Assert.assertTrue(operator instanceof WindowOperator);
WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows);
Assert.assertTrue(winOperator.getStateDescriptor() instanceof ReducingStateDescriptor);
processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}
代码示例来源:origin: apache/flink
@Test
@SuppressWarnings("rawtypes")
public void testReduceProcessingTime() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
DataStream<Tuple2<String, Integer>> window1 = source
.windowAll(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
.reduce(new DummyReducer());
OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
Assert.assertTrue(operator instanceof WindowOperator);
WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingProcessingTimeWindows);
Assert.assertTrue(winOperator.getStateDescriptor() instanceof ReducingStateDescriptor);
processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}
代码示例来源:origin: apache/flink
@Test
public void testAggregateEventTime() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
DataStream<Tuple2<String, Integer>> window1 = source
.windowAll(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
.aggregate(new DummyAggregationFunction());
OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform =
(OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
Assert.assertTrue(operator instanceof WindowOperator);
WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator =
(WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows);
Assert.assertTrue(winOperator.getStateDescriptor() instanceof AggregatingStateDescriptor);
processElementAndEnsureOutput(
winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}
代码示例来源:origin: apache/flink
@Test
public void testAggregateProcessingTime() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
DataStream<Tuple2<String, Integer>> window1 = source
.windowAll(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
.aggregate(new DummyAggregationFunction());
OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform =
(OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
Assert.assertTrue(operator instanceof WindowOperator);
WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator =
(WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingProcessingTimeWindows);
Assert.assertTrue(winOperator.getStateDescriptor() instanceof AggregatingStateDescriptor);
processElementAndEnsureOutput(
winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}
代码示例来源:origin: apache/flink
@Test
@SuppressWarnings("rawtypes")
public void testFoldProcessingTime() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
DataStream<Tuple3<String, String, Integer>> window = source
.windowAll(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
.fold(new Tuple3<>("", "", 0), new DummyFolder());
OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
(OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation();
OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator();
Assert.assertTrue(operator instanceof WindowOperator);
WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingProcessingTimeWindows);
Assert.assertTrue(winOperator.getStateDescriptor() instanceof FoldingStateDescriptor);
processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}
代码示例来源:origin: apache/flink
@Test
@SuppressWarnings("rawtypes")
public void testFoldEventTime() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
DataStream<Tuple3<String, String, Integer>> window1 = source
.windowAll(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
.fold(new Tuple3<>("", "", 1), new DummyFolder());
OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
(OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window1.getTransformation();
OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator();
Assert.assertTrue(operator instanceof WindowOperator);
WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows);
Assert.assertTrue(winOperator.getStateDescriptor() instanceof FoldingStateDescriptor);
processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}
代码示例来源:origin: apache/flink
@Test
@SuppressWarnings("rawtypes")
public void testReduceWithCustomTrigger() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
DummyReducer reducer = new DummyReducer();
DataStream<Tuple2<String, Integer>> window1 = source
.windowAll(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
.trigger(CountTrigger.of(1))
.reduce(reducer);
OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
Assert.assertTrue(operator instanceof WindowOperator);
WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
Assert.assertTrue(winOperator.getTrigger() instanceof CountTrigger);
Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows);
Assert.assertTrue(winOperator.getStateDescriptor() instanceof ReducingStateDescriptor);
processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}
代码示例来源:origin: apache/flink
@Test
@SuppressWarnings("rawtypes")
public void testFoldEventTime() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
DataStream<Tuple3<String, String, Integer>> window1 = source
.keyBy(0)
.window(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
.fold(new Tuple3<>("", "", 1), new DummyFolder());
OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
(OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window1.getTransformation();
OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator();
Assert.assertTrue(operator instanceof WindowOperator);
WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows);
Assert.assertTrue(winOperator.getStateDescriptor() instanceof FoldingStateDescriptor);
processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}
代码示例来源:origin: apache/flink
@Test
@SuppressWarnings("rawtypes")
public void testReduceEventTime() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
DataStream<Tuple2<String, Integer>> window1 = source
.keyBy(new TupleKeySelector())
.window(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
.reduce(new DummyReducer());
OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
Assert.assertTrue(operator instanceof WindowOperator);
WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows);
Assert.assertTrue(winOperator.getStateDescriptor() instanceof ReducingStateDescriptor);
processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}
代码示例来源:origin: apache/flink
@Test
@SuppressWarnings("unchecked")
public void testTumblingEventTimeWindowsApply() throws Exception {
closeCalled.set(0);
final int windowSize = 3;
ListStateDescriptor<Tuple2<String, Integer>> stateDesc = new ListStateDescriptor<>("window-contents",
STRING_INT_TUPLE.createSerializer(new ExecutionConfig()));
WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
TumblingEventTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)),
new TimeWindow.Serializer(),
new TupleKeySelector(),
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
stateDesc,
new InternalIterableWindowFunction<>(new RichSumReducer<TimeWindow>()),
EventTimeTrigger.create(),
0,
null /* late data output tag */);
testTumblingEventTimeWindows(operator);
// we close once in the rest...
Assert.assertEquals("Close was not called.", 2, closeCalled.get());
}
内容来源于网络,如有侵权,请联系作者删除!