org.apache.flink.streaming.api.datastream.KeyedStream.window()方法的使用及代码示例

x33g5p2x  于2022-01-23 转载在 其他  
字(25.6k)|赞(0)|评价(0)|浏览(202)

本文整理了Java中org.apache.flink.streaming.api.datastream.KeyedStream.window()方法的一些代码示例,展示了KeyedStream.window()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。KeyedStream.window()方法的具体详情如下:
包路径:org.apache.flink.streaming.api.datastream.KeyedStream
类名称:KeyedStream
方法名:window

KeyedStream.window介绍

[英]Windows this data stream to a WindowedStream, which evaluates windows over a key grouped stream. Elements are put into windows by a WindowAssigner. The grouping of elements is done both by key and by window.

A org.apache.flink.streaming.api.windowing.triggers.Trigger can be defined to specify when windows are evaluated. However, WindowAssigners have a default Trigger that is used if a Trigger is not specified.
[中]Windows将此数据流转换为WindowedStream,WindowedStream通过密钥分组流计算Windows。元素由WindowAssigner放入窗口。元素的分组由键和窗口完成。
组织。阿帕奇。弗林克。流动。应用程序编程接口。开窗。触发器。可以定义触发器以指定计算窗口的时间。但是,WindowAssigner有一个默认触发器,在未指定触发器时使用。

代码示例

代码示例来源:origin: apache/flink

/**
 * Windows this {@code KeyedStream} into sliding time windows.
 *
 * <p>This is a shortcut for either {@code .window(SlidingEventTimeWindows.of(size, slide))} or
 * {@code .window(SlidingProcessingTimeWindows.of(size, slide))} depending on the time
 * characteristic set using
 * {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic)}
 *
 * @param size The size of the window.
 */
public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size, Time slide) {
  if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
    return window(SlidingProcessingTimeWindows.of(size, slide));
  } else {
    return window(SlidingEventTimeWindows.of(size, slide));
  }
}

代码示例来源:origin: apache/flink

/**
 * Windows this {@code KeyedStream} into tumbling time windows.
 *
 * <p>This is a shortcut for either {@code .window(TumblingEventTimeWindows.of(size))} or
 * {@code .window(TumblingProcessingTimeWindows.of(size))} depending on the time characteristic
 * set using
 * {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic)}
 *
 * @param size The size of the window.
 */
public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size) {
  if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
    return window(TumblingProcessingTimeWindows.of(size));
  } else {
    return window(TumblingEventTimeWindows.of(size));
  }
}

代码示例来源:origin: apache/flink

/**
 * Windows this {@code KeyedStream} into tumbling count windows.
 *
 * @param size The size of the windows in number of elements.
 */
public WindowedStream<T, KEY, GlobalWindow> countWindow(long size) {
  return window(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(size)));
}

代码示例来源:origin: apache/flink

/**
 * Windows this {@code KeyedStream} into sliding count windows.
 *
 * @param size The size of the windows in number of elements.
 * @param slide The slide interval in number of elements.
 */
public WindowedStream<T, KEY, GlobalWindow> countWindow(long size, long slide) {
  return window(GlobalWindows.create())
      .evictor(CountEvictor.of(size))
      .trigger(CountTrigger.of(slide));
}

代码示例来源:origin: apache/flink

/**
 * .aggregate() does not support RichAggregateFunction, since the AggregationFunction is used internally
 * in a {@code AggregatingState}.
 */
@Test(expected = UnsupportedOperationException.class)
public void testAggregateWithRichFunctionFails() throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
  env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
  source
      .keyBy(0)
      .window(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
      .aggregate(new DummyRichAggregationFunction<Tuple2<String, Integer>>());
  fail("exception was not thrown");
}

代码示例来源:origin: apache/flink

/**
 * .reduce() does not support RichReduceFunction, since the reduce function is used internally
 * in a {@code ReducingState}.
 */
@Test(expected = UnsupportedOperationException.class)
public void testReduceWithRichReducerFails() throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
  env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
  source
    .keyBy(0)
    .window(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
    .reduce(new RichReduceFunction<Tuple2<String, Integer>>() {
      @Override
      public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1,
        Tuple2<String, Integer> value2) throws Exception {
        return null;
      }
    });
  fail("exception was not thrown");
}

代码示例来源:origin: apache/flink

/**
 * .fold() does not support RichFoldFunction, since the fold function is used internally
 * in a {@code FoldingState}.
 */
@Test(expected = UnsupportedOperationException.class)
public void testFoldWithRichFolderFails() throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
  env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
  source
      .keyBy(0)
      .window(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
      .fold(new Tuple2<>("", 0), new RichFoldFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
        @Override
        public Tuple2<String, Integer> fold(Tuple2<String, Integer> value1,
            Tuple2<String, Integer> value2) throws Exception {
          return null;
        }
      });
  fail("exception was not thrown");
}

代码示例来源:origin: apache/flink

@Test
public void testErrorOnEventTimeWithoutTimestamps() {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setParallelism(2);
  env.getConfig().disableSysoutLogging();
  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
  DataStream<Tuple2<String, Integer>> source1 =
      env.fromElements(new Tuple2<>("a", 1), new Tuple2<>("b", 2));
  source1
      .keyBy(0)
      .window(TumblingEventTimeWindows.of(Time.seconds(5)))
      .reduce(new ReduceFunction<Tuple2<String, Integer>>() {
        @Override
        public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2)  {
          return value1;
        }
      })
      .print();
  try {
    env.execute();
    fail("this should fail with an exception");
  } catch (Exception e) {
    // expected
  }
}

代码示例来源:origin: apache/flink

@Test
@SuppressWarnings("rawtypes")
public void testReduceProcessingTime() throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
  DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
  DataStream<Tuple2<String, Integer>> window1 = source
      .keyBy(new TupleKeySelector())
      .window(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
      .reduce(new DummyReducer());
  OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
  OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
  Assert.assertTrue(operator instanceof WindowOperator);
  WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
  Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
  Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingProcessingTimeWindows);
  Assert.assertTrue(winOperator.getStateDescriptor() instanceof ReducingStateDescriptor);
  processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}

代码示例来源:origin: apache/flink

@Test
@SuppressWarnings("rawtypes")
public void testFoldEventTime() throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
  DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
  DataStream<Tuple3<String, String, Integer>> window1 = source
      .keyBy(0)
      .window(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
      .fold(new Tuple3<>("", "", 1), new DummyFolder());
  OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
      (OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window1.getTransformation();
  OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator();
  Assert.assertTrue(operator instanceof WindowOperator);
  WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
  Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
  Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows);
  Assert.assertTrue(winOperator.getStateDescriptor() instanceof FoldingStateDescriptor);
  processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}

代码示例来源:origin: apache/flink

@Test
@SuppressWarnings("rawtypes")
public void testReduceEventTime() throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
  DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
  DataStream<Tuple2<String, Integer>> window1 = source
      .keyBy(new TupleKeySelector())
      .window(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
      .reduce(new DummyReducer());
  OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
  OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
  Assert.assertTrue(operator instanceof WindowOperator);
  WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
  Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
  Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows);
  Assert.assertTrue(winOperator.getStateDescriptor() instanceof ReducingStateDescriptor);
  processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}

代码示例来源:origin: apache/flink

@Test
public void testAggregateWithProcessWindowFunctionEventTime() throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
  DataStream<Tuple3<String, String, Integer>> source = env.fromElements(
    Tuple3.of("hello", "hallo", 1),
    Tuple3.of("hello", "hallo", 2));
  DataStream<String> window = source
      .keyBy(new Tuple3KeySelector())
      .window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
      .aggregate(new DummyAggregationFunction(), new TestProcessWindowFunction());
  final OneInputTransformation<Tuple3<String, String, Integer>, String> transform =
    (OneInputTransformation<Tuple3<String, String, Integer>, String>) window.getTransformation();
  final OneInputStreamOperator<Tuple3<String, String, Integer>, String> operator = transform.getOperator();
  Assert.assertTrue(operator instanceof WindowOperator);
  WindowOperator<String, Tuple3<String, String, Integer>, ?, ?, ?> winOperator =
    (WindowOperator<String, Tuple3<String, String, Integer>, ?, ?, ?>) operator;
  Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
  Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
  Assert.assertTrue(winOperator.getStateDescriptor() instanceof AggregatingStateDescriptor);
  processElementAndEnsureOutput(
      operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple3<>("hello", "hallo", 1));
}

代码示例来源:origin: apache/flink

@Test
public void testAggregateProcessingTime() throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
  DataStream<Tuple3<String, String, Integer>> source = env.fromElements(
    Tuple3.of("hello", "hallo", 1),
    Tuple3.of("hello", "hallo", 2));
  DataStream<Integer> window1 = source
      .keyBy(new Tuple3KeySelector())
      .window(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
      .aggregate(new DummyAggregationFunction());
  final OneInputTransformation<Tuple3<String, String, Integer>, Integer> transform =
    (OneInputTransformation<Tuple3<String, String, Integer>, Integer>) window1.getTransformation();
  final OneInputStreamOperator<Tuple3<String, String, Integer>, Integer> operator = transform.getOperator();
  Assert.assertTrue(operator instanceof WindowOperator);
  WindowOperator<String, Tuple3<String, String, Integer>, ?, ?, ?> winOperator =
      (WindowOperator<String, Tuple3<String, String, Integer>, ?, ?, ?>) operator;
  Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
  Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingProcessingTimeWindows);
  Assert.assertTrue(winOperator.getStateDescriptor() instanceof AggregatingStateDescriptor);
  processElementAndEnsureOutput(
      winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple3<>("hello", "hallo", 1));
}

代码示例来源:origin: apache/flink

@Test
@SuppressWarnings("rawtypes")
public void testReduceWithCustomTrigger() throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
  DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
  DummyReducer reducer = new DummyReducer();
  DataStream<Tuple2<String, Integer>> window1 = source
      .keyBy(0)
      .window(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
      .trigger(CountTrigger.of(1))
      .reduce(reducer);
  OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
  OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
  Assert.assertTrue(operator instanceof WindowOperator);
  WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
  Assert.assertTrue(winOperator.getTrigger() instanceof CountTrigger);
  Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows);
  Assert.assertTrue(winOperator.getStateDescriptor() instanceof ReducingStateDescriptor);
  processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}

代码示例来源:origin: apache/flink

@Test
@SuppressWarnings("rawtypes")
public void testFoldProcessingTime() throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
  DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
  DataStream<Tuple3<String, String, Integer>> window = source
      .keyBy(new TupleKeySelector())
      .window(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
      .fold(new Tuple3<>("", "", 0), new DummyFolder());
  OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
      (OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String,  Integer>>) window.getTransformation();
  OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator();
  Assert.assertTrue(operator instanceof WindowOperator);
  WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
  Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
  Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingProcessingTimeWindows);
  Assert.assertTrue(winOperator.getStateDescriptor() instanceof FoldingStateDescriptor);
  processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}

代码示例来源:origin: apache/flink

@Test
@SuppressWarnings("rawtypes")
public void testFoldWithCustomTrigger() throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
  DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
  DataStream<Tuple3<String, String, Integer>> window1 = source
      .keyBy(0)
      .window(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
      .trigger(CountTrigger.of(1))
      .fold(new Tuple3<>("", "", 1), new DummyFolder());
  OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
      (OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window1.getTransformation();
  OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator();
  Assert.assertTrue(operator instanceof WindowOperator);
  WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
  Assert.assertTrue(winOperator.getTrigger() instanceof CountTrigger);
  Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows);
  Assert.assertTrue(winOperator.getStateDescriptor() instanceof FoldingStateDescriptor);
  processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}

代码示例来源:origin: apache/flink

@Test
public void testAggregateWithWindowFunctionEventTime() throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
  DataStream<Tuple3<String, String, Integer>> source = env.fromElements(
    Tuple3.of("hello", "hallo", 1),
    Tuple3.of("hello", "hallo", 2));
  DummyReducer reducer = new DummyReducer();
  DataStream<String> window = source
      .keyBy(new Tuple3KeySelector())
      .window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
      .aggregate(new DummyAggregationFunction(), new TestWindowFunction());
  final OneInputTransformation<Tuple3<String, String, Integer>, String> transform =
    (OneInputTransformation<Tuple3<String, String, Integer>, String>) window.getTransformation();
  final OneInputStreamOperator<Tuple3<String, String, Integer>, String> operator = transform.getOperator();
  Assert.assertTrue(operator instanceof WindowOperator);
  WindowOperator<String, Tuple3<String, String, Integer>, ?, ?, ?> winOperator =
    (WindowOperator<String, Tuple3<String, String, Integer>, ?, ?, ?>) operator;
  Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
  Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
  Assert.assertTrue(winOperator.getStateDescriptor() instanceof AggregatingStateDescriptor);
  processElementAndEnsureOutput(
      operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple3<>("hello", "hallo", 1));
}

代码示例来源:origin: apache/flink

@Test
@SuppressWarnings("rawtypes")
public void testReduceWithEvictor() throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
  DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
  DummyReducer reducer = new DummyReducer();
  DataStream<Tuple2<String, Integer>> window1 = source
      .keyBy(0)
      .window(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
      .evictor(CountEvictor.of(100))
      .reduce(reducer);
  OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
  OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
  Assert.assertTrue(operator instanceof EvictingWindowOperator);
  EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?> winOperator = (EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?>) operator;
  Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
  Assert.assertTrue(winOperator.getEvictor() instanceof CountEvictor);
  Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows);
  Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
  processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}

代码示例来源:origin: apache/flink

@Test
public void testAggregateWithEvictor() throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
  DataStream<Tuple3<String, String, Integer>> source = env.fromElements(
    Tuple3.of("hello", "hallo", 1),
    Tuple3.of("hello", "hallo", 2));
  DataStream<Integer> window1 = source
      .keyBy(new Tuple3KeySelector())
      .window(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
      .evictor(CountEvictor.of(100))
      .aggregate(new DummyAggregationFunction());
  final OneInputTransformation<Tuple3<String, String, Integer>, Integer> transform =
    (OneInputTransformation<Tuple3<String, String, Integer>, Integer>) window1.getTransformation();
  final OneInputStreamOperator<Tuple3<String, String, Integer>, Integer> operator = transform.getOperator();
  Assert.assertTrue(operator instanceof WindowOperator);
  WindowOperator<String, Tuple3<String, String, Integer>, ?, ?, ?> winOperator =
      (WindowOperator<String, Tuple3<String, String, Integer>, ?, ?, ?>) operator;
  Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
  Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows);
  Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
  processElementAndEnsureOutput(
      winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple3<>("hello", "hallo", 1));
}

代码示例来源:origin: apache/flink

@Test
@SuppressWarnings({"rawtypes", "unchecked"})
public void testFoldWithEvictor() throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
  DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
  DataStream<Tuple3<String, String, Integer>> window1 = source
      .keyBy(0)
      .window(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
      .evictor(CountEvictor.of(100))
      .fold(new Tuple3<>("", "", 1), new DummyFolder());
  OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
      (OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window1.getTransformation();
  OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator();
  Assert.assertTrue(operator instanceof EvictingWindowOperator);
  EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?> winOperator = (EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?>) operator;
  Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
  Assert.assertTrue(winOperator.getEvictor() instanceof CountEvictor);
  Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows);
  Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
  winOperator.setOutputType((TypeInformation) window1.getType(), new ExecutionConfig());
  processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}

相关文章