本文整理了Java中org.apache.flink.streaming.api.datastream.KeyedStream.timeWindow()
方法的一些代码示例,展示了KeyedStream.timeWindow()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。KeyedStream.timeWindow()
方法的具体详情如下:
包路径:org.apache.flink.streaming.api.datastream.KeyedStream
类名称:KeyedStream
方法名:timeWindow
[英]Windows this KeyedStream into tumbling time windows.
This is a shortcut for either .window(TumblingEventTimeWindows.of(size)) or .window(TumblingProcessingTimeWindows.of(size)) depending on the time characteristic set using org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic)
[中]Windows将此键输入翻滚时间窗口。
这是一条捷径。窗口(尺寸为的TumblingEventTimeWindows.)或。窗口(TumblingProcessingTimeWindows.of(size))取决于使用org设置的时间特征。阿帕奇。弗林克。流动。应用程序编程接口。环境StreamExecutionEnvironment#setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic)
代码示例来源:origin: apache/flink
/**
* A thin wrapper layer over {@link KeyedStream#timeWindow(Time)}.
*
* @param size The size of the window.
* @return The python windowed stream {@link PythonWindowedStream}
*/
public PythonWindowedStream time_window(Time size) {
return new PythonWindowedStream<TimeWindow>(this.stream.timeWindow(size));
}
代码示例来源:origin: apache/flink
/**
* A thin wrapper layer over {@link KeyedStream#timeWindow(Time, Time)}.
*
* @param size The size of the window.
* @return The python wrapper {@link PythonWindowedStream}
*/
public PythonWindowedStream time_window(Time size, Time slide) {
return new PythonWindowedStream<TimeWindow>(this.stream.timeWindow(size, slide));
}
代码示例来源:origin: apache/flink
static WindowedStream<Event, Integer, TimeWindow> applyTumblingWindows(
KeyedStream<Event, Integer> keyedStream, ParameterTool pt) {
long eventTimeProgressPerEvent = pt.getLong(
SEQUENCE_GENERATOR_SRC_EVENT_TIME_CLOCK_PROGRESS.key(),
SEQUENCE_GENERATOR_SRC_EVENT_TIME_CLOCK_PROGRESS.defaultValue());
return keyedStream.timeWindow(
Time.milliseconds(
pt.getLong(
TUMBLING_WINDOW_OPERATOR_NUM_EVENTS.key(),
TUMBLING_WINDOW_OPERATOR_NUM_EVENTS.defaultValue()
) * eventTimeProgressPerEvent
)
);
}
代码示例来源:origin: apache/flink
.timeWindow(Time.seconds(5))
代码示例来源:origin: apache/flink
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
DataStream<Tuple2<Long, Long>> stream = env.addSource(new DataSource());
stream
.keyBy(0)
.timeWindow(Time.of(2500, MILLISECONDS), Time.of(500, MILLISECONDS))
.reduce(new SummingReducer())
// alternative: use a apply function which does not pre-aggregate
// .keyBy(new FirstFieldKeyExtractor<Tuple2<Long, Long>, Long>())
// .window(Time.of(2500, MILLISECONDS), Time.of(500, MILLISECONDS))
// .apply(new SummingWindowFunction())
.addSink(new SinkFunction<Tuple2<Long, Long>>() {
@Override
public void invoke(Tuple2<Long, Long> value) {
}
});
env.execute();
}
代码示例来源:origin: apache/flink
@Test
public void testApplyWindowState() throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
env.registerTypeWithKryoSerializer(File.class, JavaSerializer.class);
DataStream<File> src = env.fromElements(new File("/"));
SingleOutputStreamOperator<?> result = src
.keyBy(new KeySelector<File, String>() {
@Override
public String getKey(File value) {
return null;
}
})
.timeWindow(Time.milliseconds(1000))
.apply(new WindowFunction<File, String, String, TimeWindow>() {
@Override
public void apply(String s, TimeWindow window,
Iterable<File> input, Collector<String> out) {}
});
validateListStateDescriptorConfigured(result);
}
代码示例来源:origin: apache/flink
@Test
public void testReduceWindowState() throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
env.registerTypeWithKryoSerializer(File.class, JavaSerializer.class);
DataStream<File> src = env.fromElements(new File("/"));
SingleOutputStreamOperator<?> result = src
.keyBy(new KeySelector<File, String>() {
@Override
public String getKey(File value) {
return null;
}
})
.timeWindow(Time.milliseconds(1000))
.reduce(new ReduceFunction<File>() {
@Override
public File reduce(File value1, File value2) {
return null;
}
});
validateStateDescriptorConfigured(result);
}
代码示例来源:origin: apache/flink
@Test
public void testProcessWindowState() throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
env.registerTypeWithKryoSerializer(File.class, JavaSerializer.class);
DataStream<File> src = env.fromElements(new File("/"));
SingleOutputStreamOperator<?> result = src
.keyBy(new KeySelector<File, String>() {
@Override
public String getKey(File value) {
return null;
}
})
.timeWindow(Time.milliseconds(1000))
.process(new ProcessWindowFunction<File, String, String, TimeWindow>() {
@Override
public void process(String s, Context ctx,
Iterable<File> input, Collector<String> out) {}
});
validateListStateDescriptorConfigured(result);
}
代码示例来源:origin: apache/flink
@Test
public void testFoldWindowState() throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
env.registerTypeWithKryoSerializer(File.class, JavaSerializer.class);
DataStream<String> src = env.fromElements("abc");
SingleOutputStreamOperator<?> result = src
.keyBy(new KeySelector<String, String>() {
@Override
public String getKey(String value) {
return null;
}
})
.timeWindow(Time.milliseconds(1000))
.fold(new File("/"), new FoldFunction<String, File>() {
@Override
public File fold(File a, String e) {
return null;
}
});
validateStateDescriptorConfigured(result);
}
代码示例来源:origin: apache/flink
public static void main(String[] args) throws Exception {
ParameterTool params = ParameterTool.fromArgs(args);
String outputPath = params.getRequired("outputPath");
int recordsPerSecond = params.getInt("recordsPerSecond", 10);
int duration = params.getInt("durationInSecond", 60);
int offset = params.getInt("offsetInSecond", 0);
StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
sEnv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
sEnv.enableCheckpointing(4000);
sEnv.getConfig().setAutoWatermarkInterval(1000);
// execute a simple pass through program.
PeriodicSourceGenerator generator = new PeriodicSourceGenerator(
recordsPerSecond, duration, offset);
DataStream<Tuple> rows = sEnv.addSource(generator);
DataStream<Tuple> result = rows
.keyBy(1)
.timeWindow(Time.seconds(5))
.sum(0);
result.writeAsText(outputPath + "/result.txt", FileSystem.WriteMode.OVERWRITE)
.setParallelism(1);
sEnv.execute();
}
代码示例来源:origin: apache/flink
.timeWindow(Time.of(1000, TimeUnit.MILLISECONDS), Time.of(100, TimeUnit.MILLISECONDS))
.reduce(reducer);
.timeWindow(Time.of(1000, TimeUnit.MILLISECONDS))
.apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
private static final long serialVersionUID = 1L;
代码示例来源:origin: apache/flink
@Test
public void testProcessdWindowFunctionSideOutput() throws Exception {
TestListResultSink<Integer> resultSink = new TestListResultSink<>();
TestListResultSink<String> sideOutputResultSink = new TestListResultSink<>();
StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
see.setParallelism(3);
see.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<Integer> dataStream = see.fromCollection(elements);
OutputTag<String> sideOutputTag = new OutputTag<String>("side"){};
SingleOutputStreamOperator<Integer> windowOperator = dataStream
.assignTimestampsAndWatermarks(new TestWatermarkAssigner())
.keyBy(new TestKeySelector())
.timeWindow(Time.milliseconds(1), Time.milliseconds(1))
.process(new ProcessWindowFunction<Integer, Integer, Integer, TimeWindow>() {
private static final long serialVersionUID = 1L;
@Override
public void process(Integer integer, Context context, Iterable<Integer> elements, Collector<Integer> out) throws Exception {
out.collect(integer);
context.output(sideOutputTag, "sideout-" + String.valueOf(integer));
}
});
windowOperator.getSideOutput(sideOutputTag).addSink(sideOutputResultSink);
windowOperator.addSink(resultSink);
see.execute();
assertEquals(Arrays.asList("sideout-1", "sideout-2", "sideout-5"), sideOutputResultSink.getSortedResult());
assertEquals(Arrays.asList(1, 2, 5), resultSink.getSortedResult());
}
代码示例来源:origin: apache/flink
.assignTimestampsAndWatermarks(new TestWatermarkAssigner())
.keyBy(new TestKeySelector())
.timeWindow(Time.milliseconds(1), Time.milliseconds(1))
.allowedLateness(Time.milliseconds(2))
.sideOutputLateData(lateDataTag)
代码示例来源:origin: apache/flink
(KeySelector<Tuple2<Integer, Integer>, Integer>) value -> value.f0,
TypeInformation.of(Integer.class))
.timeWindow(Time.seconds(1)) // test that also timers and aggregated state work as expected
.reduce((ReduceFunction<Tuple2<Integer, Integer>>) (value1, value2) ->
new Tuple2<>(value1.f0, value1.f1 + value2.f1))
代码示例来源:origin: apache/flink
.rebalance()
.keyBy(0)
.timeWindow(Time.of(150, MILLISECONDS), Time.of(50, MILLISECONDS))
.apply(new RichWindowFunction<Tuple2<Long, IntType>, Tuple2<Long, IntType>, Tuple, TimeWindow>() {
代码示例来源:origin: apache/flink
@Test
@SuppressWarnings("rawtypes")
public void testReduceEventTimeWindows() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
DataStream<Tuple2<String, Integer>> source = env.fromElements(
Tuple2.of("hello", 1),
Tuple2.of("hello", 2));
DataStream<Tuple2<String, Integer>> window1 = source
.keyBy(0)
.timeWindow(Time.of(1000, TimeUnit.MILLISECONDS), Time.of(100, TimeUnit.MILLISECONDS))
.reduce(new DummyReducer());
OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
Assert.assertTrue(operator1 instanceof WindowOperator);
WindowOperator winOperator1 = (WindowOperator) operator1;
Assert.assertTrue(winOperator1.getTrigger() instanceof EventTimeTrigger);
Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingEventTimeWindows);
Assert.assertTrue(winOperator1.getStateDescriptor() instanceof ReducingStateDescriptor);
}
代码示例来源:origin: apache/flink
@Test
@SuppressWarnings("rawtypes")
public void testApplyEventTimeWindows() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
DataStream<Tuple2<String, Integer>> source = env.fromElements(
Tuple2.of("hello", 1),
Tuple2.of("hello", 2));
DataStream<Tuple2<String, Integer>> window1 = source
.keyBy(0)
.timeWindow(Time.of(1000, TimeUnit.MILLISECONDS))
.apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
private static final long serialVersionUID = 1L;
@Override
public void apply(Tuple tuple,
TimeWindow window,
Iterable<Tuple2<String, Integer>> values,
Collector<Tuple2<String, Integer>> out) throws Exception {
}
});
OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
Assert.assertTrue(operator1 instanceof WindowOperator);
WindowOperator winOperator1 = (WindowOperator) operator1;
Assert.assertTrue(winOperator1.getTrigger() instanceof EventTimeTrigger);
Assert.assertTrue(winOperator1.getWindowAssigner() instanceof TumblingEventTimeWindows);
Assert.assertTrue(winOperator1.getStateDescriptor() instanceof ListStateDescriptor);
}
代码示例来源:origin: apache/flink
@Test
@SuppressWarnings("rawtypes")
public void testFoldEventTimeWindows() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
DataStream<Tuple2<String, Integer>> source = env.fromElements(
Tuple2.of("hello", 1),
Tuple2.of("hello", 2));
DataStream<Tuple2<String, Integer>> window1 = source
.keyBy(0)
.timeWindow(Time.of(1000, TimeUnit.MILLISECONDS), Time.of(100, TimeUnit.MILLISECONDS))
.fold(new Tuple2<>("", 1), new DummyFolder());
OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
Assert.assertTrue(operator1 instanceof WindowOperator);
WindowOperator winOperator1 = (WindowOperator) operator1;
Assert.assertTrue(winOperator1.getTrigger() instanceof EventTimeTrigger);
Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingEventTimeWindows);
Assert.assertTrue(winOperator1.getStateDescriptor() instanceof FoldingStateDescriptor);
}
代码示例来源:origin: apache/flink
.rebalance()
.keyBy(0)
.timeWindow(Time.of(150, MILLISECONDS), Time.of(50, MILLISECONDS))
.reduce(new ReduceFunction<Tuple2<Long, IntType>>() {
@Override
代码示例来源:origin: apache/flink
.rebalance()
.keyBy(0)
.timeWindow(Time.of(100, MILLISECONDS))
.reduce(new ReduceFunction<Tuple2<Long, IntType>>() {
内容来源于网络,如有侵权,请联系作者删除!