org.apache.flink.streaming.api.datastream.AllWindowedStream.trigger()方法的使用及代码示例

x33g5p2x  于2022-01-17 转载在 其他  
字(12.6k)|赞(0)|评价(0)|浏览(112)

本文整理了Java中org.apache.flink.streaming.api.datastream.AllWindowedStream.trigger()方法的一些代码示例,展示了AllWindowedStream.trigger()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。AllWindowedStream.trigger()方法的具体详情如下:
包路径:org.apache.flink.streaming.api.datastream.AllWindowedStream
类名称:AllWindowedStream
方法名:trigger

AllWindowedStream.trigger介绍

[英]The trigger that is used for window evaluation/emission.
[中]用于窗口评估/发射的触发器。

代码示例

代码示例来源:origin: apache/flink

/**
 * Windows this {@code DataStream} into tumbling count windows.
 *
 * <p>Note: This operation is inherently non-parallel since all elements have to pass through
 * the same operator instance.
 *
 * @param size The size of the windows in number of elements.
 */
public AllWindowedStream<T, GlobalWindow> countWindowAll(long size) {
  return windowAll(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(size)));
}

代码示例来源:origin: apache/flink

/**
 * Windows this {@code DataStream} into sliding count windows.
 *
 * <p>Note: This operation is inherently non-parallel since all elements have to pass through
 * the same operator instance.
 *
 * @param size The size of the windows in number of elements.
 * @param slide The slide interval in number of elements.
 */
public AllWindowedStream<T, GlobalWindow> countWindowAll(long size, long slide) {
  return windowAll(GlobalWindows.create())
      .evictor(CountEvictor.of(size))
      .trigger(CountTrigger.of(slide));
}

代码示例来源:origin: apache/flink

windowedStream.trigger(new Trigger<String, TimeWindow>() {
  private static final long serialVersionUID = 6558046711583024443L;

代码示例来源:origin: apache/flink

.trigger(PurgingTrigger.of(CountTrigger.of(10)))
.fold(0L, new FoldFunction<Long, Long>() {
  private static final long serialVersionUID = 1L;

代码示例来源:origin: apache/flink

.trigger(PurgingTrigger.of(CountTrigger.of(5)))
.apply(new AllWindowFunction<Tuple2<Integer, String>, String, GlobalWindow>() {
  @Override
.trigger(PurgingTrigger.of(CountTrigger.of(5)))
.fold(new CustomPOJO(), new FoldFunction<String, CustomPOJO>() {
  private static final long serialVersionUID = 1L;

代码示例来源:origin: apache/flink

@Test
@SuppressWarnings("rawtypes")
public void testReduceWithCustomTrigger() throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
  DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
  DummyReducer reducer = new DummyReducer();
  DataStream<Tuple2<String, Integer>> window1 = source
      .windowAll(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
      .trigger(CountTrigger.of(1))
      .reduce(reducer);
  OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
  OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
  Assert.assertTrue(operator instanceof WindowOperator);
  WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
  Assert.assertTrue(winOperator.getTrigger() instanceof CountTrigger);
  Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows);
  Assert.assertTrue(winOperator.getStateDescriptor() instanceof ReducingStateDescriptor);
  processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}

代码示例来源:origin: apache/flink

@Test
@SuppressWarnings("rawtypes")
public void testFoldWithCustomTrigger() throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
  DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
  DataStream<Tuple3<String, String, Integer>> window1 = source
      .windowAll(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
      .trigger(CountTrigger.of(1))
      .fold(new Tuple3<>("", "", 1), new DummyFolder());
  OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
      (OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window1.getTransformation();
  OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator();
  Assert.assertTrue(operator instanceof WindowOperator);
  WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
  Assert.assertTrue(winOperator.getTrigger() instanceof CountTrigger);
  Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows);
  Assert.assertTrue(winOperator.getStateDescriptor() instanceof FoldingStateDescriptor);
  processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}

代码示例来源:origin: apache/flink

@Test
@SuppressWarnings("rawtypes")
public void testProcessWithCustomTrigger() throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
  DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
  DataStream<Tuple2<String, Integer>> window1 = source
      .windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
      .trigger(CountTrigger.of(1))
      .process(new ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
        private static final long serialVersionUID = 1L;
        @Override
        public void process(
            Context ctx,
            Iterable<Tuple2<String, Integer>> values,
            Collector<Tuple2<String, Integer>> out) throws Exception {
          for (Tuple2<String, Integer> in : values) {
            out.collect(in);
          }
        }
      });
  OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
  OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
  Assert.assertTrue(operator instanceof WindowOperator);
  WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
  Assert.assertTrue(winOperator.getTrigger() instanceof CountTrigger);
  Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
  Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
  processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}

代码示例来源:origin: apache/flink

@Test
@SuppressWarnings("rawtypes")
public void testApplyWithCustomTrigger() throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
  DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
  DataStream<Tuple2<String, Integer>> window1 = source
      .windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
      .trigger(CountTrigger.of(1))
      .apply(new AllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
        private static final long serialVersionUID = 1L;
        @Override
        public void apply(
            TimeWindow window,
            Iterable<Tuple2<String, Integer>> values,
            Collector<Tuple2<String, Integer>> out) throws Exception {
          for (Tuple2<String, Integer> in : values) {
            out.collect(in);
          }
        }
      });
  OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
  OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
  Assert.assertTrue(operator instanceof WindowOperator);
  WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
  Assert.assertTrue(winOperator.getTrigger() instanceof CountTrigger);
  Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
  Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
  processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}

代码示例来源:origin: apache/flink

.trigger(CountTrigger.of(1))
.evictor(TimeEvictor.of(Time.of(100, TimeUnit.MILLISECONDS)))
.process(new ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {

代码示例来源:origin: apache/flink

.trigger(CountTrigger.of(1))
.evictor(TimeEvictor.of(Time.of(100, TimeUnit.MILLISECONDS)))
.apply(new AllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {

代码示例来源:origin: apache/flink

.trigger(PurgingTrigger.of(CountTrigger.of(10)))
.fold(0L, new FoldFunction<Long, Long>() {
  private static final long serialVersionUID = 1L;

代码示例来源:origin: apache/flink

.trigger(PurgingTrigger.of(CountTrigger.of(10)))
.fold(0L, new FoldFunction<Long, Long>() {
  @Override

代码示例来源:origin: org.apache.flink/flink-streaming-java_2.10

/**
 * Windows this {@code DataStream} into tumbling count windows.
 *
 * <p>Note: This operation can be inherently non-parallel since all elements have to pass through
 * the same operator instance. (Only for special cases, such as aligned time windows is
 * it possible to perform this operation in parallel).
 *
 * @param size The size of the windows in number of elements.
 */
public AllWindowedStream<T, GlobalWindow> countWindowAll(long size) {
  return windowAll(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(size)));
}

代码示例来源:origin: org.apache.flink/flink-streaming-java

/**
 * Windows this {@code DataStream} into tumbling count windows.
 *
 * <p>Note: This operation is inherently non-parallel since all elements have to pass through
 * the same operator instance.
 *
 * @param size The size of the windows in number of elements.
 */
public AllWindowedStream<T, GlobalWindow> countWindowAll(long size) {
  return windowAll(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(size)));
}

代码示例来源:origin: org.apache.flink/flink-streaming-java_2.11

/**
 * Windows this {@code DataStream} into tumbling count windows.
 *
 * <p>Note: This operation is inherently non-parallel since all elements have to pass through
 * the same operator instance.
 *
 * @param size The size of the windows in number of elements.
 */
public AllWindowedStream<T, GlobalWindow> countWindowAll(long size) {
  return windowAll(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(size)));
}

代码示例来源:origin: org.apache.flink/flink-streaming-java

/**
 * Windows this {@code DataStream} into sliding count windows.
 *
 * <p>Note: This operation is inherently non-parallel since all elements have to pass through
 * the same operator instance.
 *
 * @param size The size of the windows in number of elements.
 * @param slide The slide interval in number of elements.
 */
public AllWindowedStream<T, GlobalWindow> countWindowAll(long size, long slide) {
  return windowAll(GlobalWindows.create())
      .evictor(CountEvictor.of(size))
      .trigger(CountTrigger.of(slide));
}

代码示例来源:origin: org.apache.flink/flink-streaming-java_2.11

/**
 * Windows this {@code DataStream} into sliding count windows.
 *
 * <p>Note: This operation is inherently non-parallel since all elements have to pass through
 * the same operator instance.
 *
 * @param size The size of the windows in number of elements.
 * @param slide The slide interval in number of elements.
 */
public AllWindowedStream<T, GlobalWindow> countWindowAll(long size, long slide) {
  return windowAll(GlobalWindows.create())
      .evictor(CountEvictor.of(size))
      .trigger(CountTrigger.of(slide));
}

代码示例来源:origin: org.apache.flink/flink-streaming-java_2.10

/**
 * Windows this {@code DataStream} into sliding count windows.
 *
 * <p>Note: This operation can be inherently non-parallel since all elements have to pass through
 * the same operator instance. (Only for special cases, such as aligned time windows is
 * it possible to perform this operation in parallel).
 *
 * @param size The size of the windows in number of elements.
 * @param slide The slide interval in number of elements.
 */
public AllWindowedStream<T, GlobalWindow> countWindowAll(long size, long slide) {
  return windowAll(GlobalWindows.create())
      .evictor(CountEvictor.of(size))
      .trigger(CountTrigger.of(slide));
}

相关文章