org.apache.flink.streaming.api.datastream.AllWindowedStream.process()方法的使用及代码示例

x33g5p2x  于2022-01-17 转载在 其他  
字(9.0k)|赞(0)|评价(0)|浏览(140)

本文整理了Java中org.apache.flink.streaming.api.datastream.AllWindowedStream.process()方法的一些代码示例,展示了AllWindowedStream.process()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。AllWindowedStream.process()方法的具体详情如下:
包路径:org.apache.flink.streaming.api.datastream.AllWindowedStream
类名称:AllWindowedStream
方法名:process

AllWindowedStream.process介绍

[英]Applies the given window function to each window. The window function is called for each evaluation of the window. The output of the window function is interpreted as a regular non-windowed stream.

Not that this function requires that all data in the windows is buffered until the window is evaluated, as the function provides no means of incremental aggregation.
[中]将给定的窗口函数应用于每个窗口。对窗口的每次求值都调用window函数。窗口函数的输出被解释为常规的非窗口流。
并不是说该函数要求在计算窗口之前缓冲窗口中的所有数据,因为该函数不提供增量聚合方法。

代码示例

代码示例来源:origin: apache/flink

@Test
public void testProcessAllWindowState() throws Exception {
  final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
  env.registerTypeWithKryoSerializer(File.class, JavaSerializer.class);
  DataStream<File> src = env.fromElements(new File("/"));
  SingleOutputStreamOperator<?> result = src
      .timeWindowAll(Time.milliseconds(1000))
      .process(new ProcessAllWindowFunction<File, String, TimeWindow>() {
        @Override
        public void process(Context ctx, Iterable<File> input, Collector<String> out) {}
      });
  validateListStateDescriptorConfigured(result);
}

代码示例来源:origin: apache/flink

.assignTimestampsAndWatermarks(new TestWatermarkAssigner())
.timeWindowAll(Time.milliseconds(1), Time.milliseconds(1))
.process(new ProcessAllWindowFunction<Integer, Integer, TimeWindow>() {
  private static final long serialVersionUID = 1L;

代码示例来源:origin: apache/flink

@Test
@SuppressWarnings("rawtypes")
public void testProcessProcessingTime() throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
  DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
  DataStream<Tuple2<String, Integer>> window1 = source
      .windowAll(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
      .process(new ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
        private static final long serialVersionUID = 1L;
        @Override
        public void process(
            Context ctx,
            Iterable<Tuple2<String, Integer>> values,
            Collector<Tuple2<String, Integer>> out) throws Exception {
          for (Tuple2<String, Integer> in : values) {
            out.collect(in);
          }
        }
      });
  OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
  OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
  Assert.assertTrue(operator instanceof WindowOperator);
  WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
  Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
  Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
  Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
  processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}

代码示例来源:origin: apache/flink

@Test
@SuppressWarnings("rawtypes")
public void testMergingWindowsWithEvictor() throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
  DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
  DataStream<Tuple3<String, String, Integer>> window1 = source
      .windowAll(EventTimeSessionWindows.withGap(Time.seconds(5)))
      .evictor(CountEvictor.of(5))
      .process(new TestProcessAllWindowFunction());
  OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window1.getTransformation();
  OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator();
  Assert.assertTrue(operator instanceof WindowOperator);
  WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
  Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
  Assert.assertTrue(winOperator.getWindowAssigner() instanceof EventTimeSessionWindows);
  Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
  processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}

代码示例来源:origin: apache/flink

@Test
@SuppressWarnings("rawtypes")
public void testProcessEventTime() throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
  DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
  DataStream<Tuple2<String, Integer>> window1 = source
      .windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
      .process(new ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
        private static final long serialVersionUID = 1L;
        @Override
        public void process(
            Context ctx,
            Iterable<Tuple2<String, Integer>> values,
            Collector<Tuple2<String, Integer>> out) throws Exception {
          for (Tuple2<String, Integer> in : values) {
            out.collect(in);
          }
        }
      });
  OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
  OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
  Assert.assertTrue(operator instanceof WindowOperator);
  WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
  Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
  Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
  Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
  processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}

代码示例来源:origin: apache/flink

@Test
@SuppressWarnings("rawtypes")
public void testProcessWithCustomTrigger() throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
  DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
  DataStream<Tuple2<String, Integer>> window1 = source
      .windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
      .trigger(CountTrigger.of(1))
      .process(new ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
        private static final long serialVersionUID = 1L;
        @Override
        public void process(
            Context ctx,
            Iterable<Tuple2<String, Integer>> values,
            Collector<Tuple2<String, Integer>> out) throws Exception {
          for (Tuple2<String, Integer> in : values) {
            out.collect(in);
          }
        }
      });
  OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
  OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
  Assert.assertTrue(operator instanceof WindowOperator);
  WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
  Assert.assertTrue(winOperator.getTrigger() instanceof CountTrigger);
  Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
  Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
  processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}

代码示例来源:origin: apache/flink

.trigger(CountTrigger.of(1))
.evictor(TimeEvictor.of(Time.of(100, TimeUnit.MILLISECONDS)))
.process(new ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
  private static final long serialVersionUID = 1L;

相关文章