org.apache.flink.streaming.api.datastream.AllWindowedStream.evictor()方法的使用及代码示例

x33g5p2x  于2022-01-17 转载在 其他  
字(10.0k)|赞(0)|评价(0)|浏览(98)

本文整理了Java中org.apache.flink.streaming.api.datastream.AllWindowedStream.evictor()方法的一些代码示例,展示了AllWindowedStream.evictor()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。AllWindowedStream.evictor()方法的具体详情如下:
包路径:org.apache.flink.streaming.api.datastream.AllWindowedStream
类名称:AllWindowedStream
方法名:evictor

AllWindowedStream.evictor介绍

[英]The evictor that is used for evicting elements before window evaluation.
[中]用于在窗口求值之前逐出图元的逐出器。

代码示例

代码示例来源:origin: apache/flink

/**
 * Windows this {@code DataStream} into sliding count windows.
 *
 * <p>Note: This operation is inherently non-parallel since all elements have to pass through
 * the same operator instance.
 *
 * @param size The size of the windows in number of elements.
 * @param slide The slide interval in number of elements.
 */
public AllWindowedStream<T, GlobalWindow> countWindowAll(long size, long slide) {
  return windowAll(GlobalWindows.create())
      .evictor(CountEvictor.of(size))
      .trigger(CountTrigger.of(slide));
}

代码示例来源:origin: apache/flink

@Test
@SuppressWarnings("rawtypes")
public void testMergingWindowsWithEvictor() throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
  DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
  DataStream<Tuple3<String, String, Integer>> window1 = source
      .windowAll(EventTimeSessionWindows.withGap(Time.seconds(5)))
      .evictor(CountEvictor.of(5))
      .process(new TestProcessAllWindowFunction());
  OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window1.getTransformation();
  OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator();
  Assert.assertTrue(operator instanceof WindowOperator);
  WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
  Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
  Assert.assertTrue(winOperator.getWindowAssigner() instanceof EventTimeSessionWindows);
  Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
  processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}

代码示例来源:origin: apache/flink

@Test
public void testAggregateWithEvictor() throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
  DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
  DataStream<Tuple2<String, Integer>> window1 = source
      .windowAll(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
      .evictor(CountEvictor.of(100))
      .aggregate(new DummyAggregationFunction());
  OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform =
      (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
  OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
  Assert.assertTrue(operator instanceof WindowOperator);
  WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator =
      (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
  Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
  Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows);
  Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
  processElementAndEnsureOutput(
      winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}

代码示例来源:origin: apache/flink

@Test
@SuppressWarnings("rawtypes")
public void testReduceWithEvictor() throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
  DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
  DummyReducer reducer = new DummyReducer();
  DataStream<Tuple2<String, Integer>> window1 = source
      .windowAll(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
      .evictor(CountEvictor.of(100))
      .reduce(reducer);
  OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
  OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
  Assert.assertTrue(operator instanceof EvictingWindowOperator);
  EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?> winOperator = (EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?>) operator;
  Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
  Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows);
  Assert.assertTrue(winOperator.getEvictor() instanceof CountEvictor);
  Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
  processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}

代码示例来源:origin: apache/flink

.evictor(CountEvictor.of(100))
.aggregate(
    new DummyAggregationFunction(),

代码示例来源:origin: apache/flink

@Test
@SuppressWarnings({"rawtypes", "unchecked"})
public void testFoldWithEvictor() throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
  DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
  DataStream<Tuple3<String, String, Integer>> window1 = source
      .windowAll(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
      .evictor(CountEvictor.of(100))
      .fold(new Tuple3<>("", "", 1), new DummyFolder());
  OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
      (OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window1.getTransformation();
  OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator();
  Assert.assertTrue(operator instanceof EvictingWindowOperator);
  EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?> winOperator = (EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?>) operator;
  Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
  Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows);
  Assert.assertTrue(winOperator.getEvictor() instanceof CountEvictor);
  Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
  winOperator.setOutputType((TypeInformation) window1.getType(), new ExecutionConfig());
  processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}

代码示例来源:origin: apache/flink

.windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
.trigger(CountTrigger.of(1))
.evictor(TimeEvictor.of(Time.of(100, TimeUnit.MILLISECONDS)))
.process(new ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
  private static final long serialVersionUID = 1L;

代码示例来源:origin: apache/flink

.windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
.trigger(CountTrigger.of(1))
.evictor(TimeEvictor.of(Time.of(100, TimeUnit.MILLISECONDS)))
.apply(new AllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
  private static final long serialVersionUID = 1L;

代码示例来源:origin: apache/flink

.evictor(CountEvictor.of(100))
.reduce(
    reducer,

代码示例来源:origin: apache/flink

.evictor(CountEvictor.of(100))
.fold(
    new Tuple3<>("", "", 1),

代码示例来源:origin: org.apache.flink/flink-streaming-java_2.11

/**
 * Windows this {@code DataStream} into sliding count windows.
 *
 * <p>Note: This operation is inherently non-parallel since all elements have to pass through
 * the same operator instance.
 *
 * @param size The size of the windows in number of elements.
 * @param slide The slide interval in number of elements.
 */
public AllWindowedStream<T, GlobalWindow> countWindowAll(long size, long slide) {
  return windowAll(GlobalWindows.create())
      .evictor(CountEvictor.of(size))
      .trigger(CountTrigger.of(slide));
}

代码示例来源:origin: org.apache.flink/flink-streaming-java

/**
 * Windows this {@code DataStream} into sliding count windows.
 *
 * <p>Note: This operation is inherently non-parallel since all elements have to pass through
 * the same operator instance.
 *
 * @param size The size of the windows in number of elements.
 * @param slide The slide interval in number of elements.
 */
public AllWindowedStream<T, GlobalWindow> countWindowAll(long size, long slide) {
  return windowAll(GlobalWindows.create())
      .evictor(CountEvictor.of(size))
      .trigger(CountTrigger.of(slide));
}

代码示例来源:origin: org.apache.flink/flink-streaming-java_2.10

/**
 * Windows this {@code DataStream} into sliding count windows.
 *
 * <p>Note: This operation can be inherently non-parallel since all elements have to pass through
 * the same operator instance. (Only for special cases, such as aligned time windows is
 * it possible to perform this operation in parallel).
 *
 * @param size The size of the windows in number of elements.
 * @param slide The slide interval in number of elements.
 */
public AllWindowedStream<T, GlobalWindow> countWindowAll(long size, long slide) {
  return windowAll(GlobalWindows.create())
      .evictor(CountEvictor.of(size))
      .trigger(CountTrigger.of(slide));
}

相关文章