org.apache.flink.streaming.api.datastream.KeyedStream.timeWindow()方法的使用及代码示例

x33g5p2x  于2022-01-23 转载在 其他  
字(13.8k)|赞(0)|评价(0)|浏览(149)

本文整理了Java中org.apache.flink.streaming.api.datastream.KeyedStream.timeWindow()方法的一些代码示例,展示了KeyedStream.timeWindow()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。KeyedStream.timeWindow()方法的具体详情如下:
包路径:org.apache.flink.streaming.api.datastream.KeyedStream
类名称:KeyedStream
方法名:timeWindow

KeyedStream.timeWindow介绍

[英]Windows this KeyedStream into tumbling time windows.

This is a shortcut for either .window(TumblingEventTimeWindows.of(size)) or .window(TumblingProcessingTimeWindows.of(size)) depending on the time characteristic set using org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic)
[中]Windows将此键输入翻滚时间窗口。
这是一条捷径。窗口(尺寸为的TumblingEventTimeWindows.)或。窗口(TumblingProcessingTimeWindows.of(size))取决于使用org设置的时间特征。阿帕奇。弗林克。流动。应用程序编程接口。环境StreamExecutionEnvironment#setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic)

代码示例

代码示例来源:origin: apache/flink

/**
 * A thin wrapper layer over {@link KeyedStream#timeWindow(Time)}.
 *
 * @param size The size of the window.
 * @return The python windowed stream {@link PythonWindowedStream}
 */
public PythonWindowedStream time_window(Time size) {
  return new PythonWindowedStream<TimeWindow>(this.stream.timeWindow(size));
}

代码示例来源:origin: apache/flink

/**
 * A thin wrapper layer over {@link KeyedStream#timeWindow(Time, Time)}.
 *
 * @param size The size of the window.
 * @return The python wrapper {@link PythonWindowedStream}
 */
public PythonWindowedStream time_window(Time size, Time slide) {
  return new PythonWindowedStream<TimeWindow>(this.stream.timeWindow(size, slide));
}

代码示例来源:origin: apache/flink

static WindowedStream<Event, Integer, TimeWindow> applyTumblingWindows(
    KeyedStream<Event, Integer> keyedStream, ParameterTool pt) {
  long eventTimeProgressPerEvent = pt.getLong(
    SEQUENCE_GENERATOR_SRC_EVENT_TIME_CLOCK_PROGRESS.key(),
    SEQUENCE_GENERATOR_SRC_EVENT_TIME_CLOCK_PROGRESS.defaultValue());
  return keyedStream.timeWindow(
    Time.milliseconds(
      pt.getLong(
        TUMBLING_WINDOW_OPERATOR_NUM_EVENTS.key(),
        TUMBLING_WINDOW_OPERATOR_NUM_EVENTS.defaultValue()
      ) * eventTimeProgressPerEvent
    )
  );
}

代码示例来源:origin: apache/flink

.timeWindow(Time.seconds(5))

代码示例来源:origin: apache/flink

public static void main(String[] args) throws Exception {

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(4);

    DataStream<Tuple2<Long, Long>> stream = env.addSource(new DataSource());

    stream
      .keyBy(0)
      .timeWindow(Time.of(2500, MILLISECONDS), Time.of(500, MILLISECONDS))
      .reduce(new SummingReducer())

      // alternative: use a apply function which does not pre-aggregate
//            .keyBy(new FirstFieldKeyExtractor<Tuple2<Long, Long>, Long>())
//            .window(Time.of(2500, MILLISECONDS), Time.of(500, MILLISECONDS))
//            .apply(new SummingWindowFunction())

      .addSink(new SinkFunction<Tuple2<Long, Long>>() {
        @Override
        public void invoke(Tuple2<Long, Long> value) {
        }
      });

    env.execute();
  }

代码示例来源:origin: apache/flink

@Test
public void testApplyWindowState() throws Exception {
  final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
  env.registerTypeWithKryoSerializer(File.class, JavaSerializer.class);
  DataStream<File> src = env.fromElements(new File("/"));
  SingleOutputStreamOperator<?> result = src
      .keyBy(new KeySelector<File, String>() {
        @Override
        public String getKey(File value) {
          return null;
        }
      })
      .timeWindow(Time.milliseconds(1000))
      .apply(new WindowFunction<File, String, String, TimeWindow>() {
        @Override
        public void apply(String s, TimeWindow window,
                  Iterable<File> input, Collector<String> out) {}
      });
  validateListStateDescriptorConfigured(result);
}

代码示例来源:origin: apache/flink

@Test
public void testReduceWindowState() throws Exception {
  final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
  env.registerTypeWithKryoSerializer(File.class, JavaSerializer.class);
  DataStream<File> src = env.fromElements(new File("/"));
  SingleOutputStreamOperator<?> result = src
      .keyBy(new KeySelector<File, String>() {
        @Override
        public String getKey(File value) {
          return null;
        }
      })
      .timeWindow(Time.milliseconds(1000))
      .reduce(new ReduceFunction<File>() {
        @Override
        public File reduce(File value1, File value2) {
          return null;
        }
      });
  validateStateDescriptorConfigured(result);
}

代码示例来源:origin: apache/flink

@Test
public void testProcessWindowState() throws Exception {
  final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
  env.registerTypeWithKryoSerializer(File.class, JavaSerializer.class);
  DataStream<File> src = env.fromElements(new File("/"));
  SingleOutputStreamOperator<?> result = src
      .keyBy(new KeySelector<File, String>() {
        @Override
        public String getKey(File value) {
          return null;
        }
      })
      .timeWindow(Time.milliseconds(1000))
      .process(new ProcessWindowFunction<File, String, String, TimeWindow>() {
        @Override
        public void process(String s, Context ctx,
            Iterable<File> input, Collector<String> out) {}
      });
  validateListStateDescriptorConfigured(result);
}

代码示例来源:origin: apache/flink

@Test
public void testFoldWindowState() throws Exception {
  final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
  env.registerTypeWithKryoSerializer(File.class, JavaSerializer.class);
  DataStream<String> src = env.fromElements("abc");
  SingleOutputStreamOperator<?> result = src
      .keyBy(new KeySelector<String, String>() {
        @Override
        public String getKey(String value) {
          return null;
        }
      })
      .timeWindow(Time.milliseconds(1000))
      .fold(new File("/"), new FoldFunction<String, File>() {
        @Override
        public File fold(File a, String e) {
          return null;
        }
      });
  validateStateDescriptorConfigured(result);
}

代码示例来源:origin: apache/flink

public static void main(String[] args) throws Exception {
  ParameterTool params = ParameterTool.fromArgs(args);
  String outputPath = params.getRequired("outputPath");
  int recordsPerSecond = params.getInt("recordsPerSecond", 10);
  int duration = params.getInt("durationInSecond", 60);
  int offset = params.getInt("offsetInSecond", 0);
  StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
  sEnv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
  sEnv.enableCheckpointing(4000);
  sEnv.getConfig().setAutoWatermarkInterval(1000);
  // execute a simple pass through program.
  PeriodicSourceGenerator generator = new PeriodicSourceGenerator(
    recordsPerSecond, duration, offset);
  DataStream<Tuple> rows = sEnv.addSource(generator);
  DataStream<Tuple> result = rows
    .keyBy(1)
    .timeWindow(Time.seconds(5))
    .sum(0);
  result.writeAsText(outputPath + "/result.txt", FileSystem.WriteMode.OVERWRITE)
    .setParallelism(1);
  sEnv.execute();
}

代码示例来源:origin: apache/flink

.timeWindow(Time.of(1000, TimeUnit.MILLISECONDS), Time.of(100, TimeUnit.MILLISECONDS))
.reduce(reducer);
.timeWindow(Time.of(1000, TimeUnit.MILLISECONDS))
.apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
  private static final long serialVersionUID = 1L;

代码示例来源:origin: apache/flink

@Test
public void testProcessdWindowFunctionSideOutput() throws Exception {
  TestListResultSink<Integer> resultSink = new TestListResultSink<>();
  TestListResultSink<String> sideOutputResultSink = new TestListResultSink<>();
  StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
  see.setParallelism(3);
  see.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
  DataStream<Integer> dataStream = see.fromCollection(elements);
  OutputTag<String> sideOutputTag = new OutputTag<String>("side"){};
  SingleOutputStreamOperator<Integer> windowOperator = dataStream
      .assignTimestampsAndWatermarks(new TestWatermarkAssigner())
      .keyBy(new TestKeySelector())
      .timeWindow(Time.milliseconds(1), Time.milliseconds(1))
      .process(new ProcessWindowFunction<Integer, Integer, Integer, TimeWindow>() {
        private static final long serialVersionUID = 1L;
        @Override
        public void process(Integer integer, Context context, Iterable<Integer> elements, Collector<Integer> out) throws Exception {
          out.collect(integer);
          context.output(sideOutputTag, "sideout-" + String.valueOf(integer));
        }
      });
  windowOperator.getSideOutput(sideOutputTag).addSink(sideOutputResultSink);
  windowOperator.addSink(resultSink);
  see.execute();
  assertEquals(Arrays.asList("sideout-1", "sideout-2", "sideout-5"), sideOutputResultSink.getSortedResult());
  assertEquals(Arrays.asList(1, 2, 5), resultSink.getSortedResult());
}

代码示例来源:origin: apache/flink

.assignTimestampsAndWatermarks(new TestWatermarkAssigner())
.keyBy(new TestKeySelector())
.timeWindow(Time.milliseconds(1), Time.milliseconds(1))
.allowedLateness(Time.milliseconds(2))
.sideOutputLateData(lateDataTag)

代码示例来源:origin: apache/flink

(KeySelector<Tuple2<Integer, Integer>, Integer>) value -> value.f0,
TypeInformation.of(Integer.class))
.timeWindow(Time.seconds(1)) // test that also timers and aggregated state work as expected
.reduce((ReduceFunction<Tuple2<Integer, Integer>>) (value1, value2) ->
  new Tuple2<>(value1.f0, value1.f1 + value2.f1))

代码示例来源:origin: apache/flink

.rebalance()
.keyBy(0)
.timeWindow(Time.of(150, MILLISECONDS), Time.of(50, MILLISECONDS))
.apply(new RichWindowFunction<Tuple2<Long, IntType>, Tuple2<Long, IntType>, Tuple, TimeWindow>() {

代码示例来源:origin: apache/flink

@Test
@SuppressWarnings("rawtypes")
public void testReduceEventTimeWindows() throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
  DataStream<Tuple2<String, Integer>> source = env.fromElements(
      Tuple2.of("hello", 1),
      Tuple2.of("hello", 2));
  DataStream<Tuple2<String, Integer>> window1 = source
      .keyBy(0)
      .timeWindow(Time.of(1000, TimeUnit.MILLISECONDS), Time.of(100, TimeUnit.MILLISECONDS))
      .reduce(new DummyReducer());
  OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
  OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
  Assert.assertTrue(operator1 instanceof WindowOperator);
  WindowOperator winOperator1 = (WindowOperator) operator1;
  Assert.assertTrue(winOperator1.getTrigger() instanceof EventTimeTrigger);
  Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingEventTimeWindows);
  Assert.assertTrue(winOperator1.getStateDescriptor() instanceof ReducingStateDescriptor);
}

代码示例来源:origin: apache/flink

@Test
@SuppressWarnings("rawtypes")
public void testApplyEventTimeWindows() throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
  DataStream<Tuple2<String, Integer>> source = env.fromElements(
      Tuple2.of("hello", 1),
      Tuple2.of("hello", 2));
  DataStream<Tuple2<String, Integer>> window1 = source
    .keyBy(0)
    .timeWindow(Time.of(1000, TimeUnit.MILLISECONDS))
    .apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
      private static final long serialVersionUID = 1L;
      @Override
      public void apply(Tuple tuple,
        TimeWindow window,
        Iterable<Tuple2<String, Integer>> values,
        Collector<Tuple2<String, Integer>> out) throws Exception {
      }
    });
  OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
  OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
  Assert.assertTrue(operator1 instanceof WindowOperator);
  WindowOperator winOperator1 = (WindowOperator) operator1;
  Assert.assertTrue(winOperator1.getTrigger() instanceof EventTimeTrigger);
  Assert.assertTrue(winOperator1.getWindowAssigner() instanceof TumblingEventTimeWindows);
  Assert.assertTrue(winOperator1.getStateDescriptor() instanceof ListStateDescriptor);
}

代码示例来源:origin: apache/flink

@Test
@SuppressWarnings("rawtypes")
public void testFoldEventTimeWindows() throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
  DataStream<Tuple2<String, Integer>> source = env.fromElements(
      Tuple2.of("hello", 1),
      Tuple2.of("hello", 2));
  DataStream<Tuple2<String, Integer>> window1 = source
      .keyBy(0)
      .timeWindow(Time.of(1000, TimeUnit.MILLISECONDS), Time.of(100, TimeUnit.MILLISECONDS))
      .fold(new Tuple2<>("", 1), new DummyFolder());
  OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
  OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
  Assert.assertTrue(operator1 instanceof WindowOperator);
  WindowOperator winOperator1 = (WindowOperator) operator1;
  Assert.assertTrue(winOperator1.getTrigger() instanceof EventTimeTrigger);
  Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingEventTimeWindows);
  Assert.assertTrue(winOperator1.getStateDescriptor() instanceof FoldingStateDescriptor);
}

代码示例来源:origin: apache/flink

.rebalance()
.keyBy(0)
.timeWindow(Time.of(150, MILLISECONDS), Time.of(50, MILLISECONDS))
.reduce(new ReduceFunction<Tuple2<Long, IntType>>() {
  @Override

代码示例来源:origin: apache/flink

.rebalance()
.keyBy(0)
.timeWindow(Time.of(100, MILLISECONDS))
.reduce(new ReduceFunction<Tuple2<Long, IntType>>() {

相关文章