org.apache.flink.streaming.api.datastream.AllWindowedStream.fold()方法的使用及代码示例

x33g5p2x  于2022-01-17 转载在 其他  
字(22.3k)|赞(0)|评价(0)|浏览(149)

本文整理了Java中org.apache.flink.streaming.api.datastream.AllWindowedStream.fold()方法的一些代码示例,展示了AllWindowedStream.fold()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。AllWindowedStream.fold()方法的具体详情如下:
包路径:org.apache.flink.streaming.api.datastream.AllWindowedStream
类名称:AllWindowedStream
方法名:fold

AllWindowedStream.fold介绍

[英]Applies the given fold function to each window. The window function is called for each evaluation of the window for each key individually. The output of the reduce function is interpreted as a regular non-windowed stream.
[中]将给定的折叠功能应用于每个窗口。对于每个关键点的每次窗口评估,都会分别调用window函数。reduce函数的输出被解释为常规的非窗口流。

代码示例

代码示例来源:origin: apache/flink

/**
 * Applies the given fold function to each window. The window function is called for each
 * evaluation of the window for each key individually. The output of the reduce function is
 * interpreted as a regular non-windowed stream.
 *
 * @param function The fold function.
 * @return The data stream that is the result of applying the fold function to the window.
 *
 * @deprecated use {@link #aggregate(AggregateFunction, TypeInformation, TypeInformation)} instead
 */
@Deprecated
public <R> SingleOutputStreamOperator<R> fold(R initialValue, FoldFunction<T, R> function, TypeInformation<R> resultType) {
  if (function instanceof RichFunction) {
    throw new UnsupportedOperationException("FoldFunction of fold can not be a RichFunction. " +
        "Please use fold(FoldFunction, WindowFunction) instead.");
  }
  return fold(initialValue, function, new PassThroughAllWindowFunction<W, R>(), resultType, resultType);
}

代码示例来源:origin: apache/flink

/**
 * Applies the given fold function to each window. The window function is called for each
 * evaluation of the window for each key individually. The output of the reduce function is
 * interpreted as a regular non-windowed stream.
 *
 * @param function The fold function.
 * @return The data stream that is the result of applying the fold function to the window.
 *
 * @deprecated use {@link #aggregate(AggregateFunction)} instead
 */
@Deprecated
public <R> SingleOutputStreamOperator<R> fold(R initialValue, FoldFunction<T, R> function) {
  if (function instanceof RichFunction) {
    throw new UnsupportedOperationException("FoldFunction of fold can not be a RichFunction. " +
        "Please use fold(FoldFunction, WindowFunction) instead.");
  }
  TypeInformation<R> resultType = TypeExtractor.getFoldReturnTypes(function, input.getType(),
      Utils.getCallLocationName(), true);
  return fold(initialValue, function, resultType);
}

代码示例来源:origin: apache/flink

/**
 * Applies the given window function to each window. The window function is called for each
 * evaluation of the window for each key individually. The output of the window function is
 * interpreted as a regular non-windowed stream.
 *
 * <p>Arriving data is incrementally aggregated using the given fold function.
 *
 * @param initialValue The initial value of the fold.
 * @param foldFunction The fold function that is used for incremental aggregation.
 * @param function The window function.
 * @return The data stream that is the result of applying the window function to the window.
 *
 * @deprecated use {@link #aggregate(AggregateFunction, ProcessAllWindowFunction)} instead
 */
@PublicEvolving
@Deprecated
public <ACC, R> SingleOutputStreamOperator<R> fold(ACC initialValue, FoldFunction<T, ACC> foldFunction, ProcessAllWindowFunction<ACC, R, W> function) {
  TypeInformation<ACC> foldAccumulatorType = TypeExtractor.getFoldReturnTypes(foldFunction, input.getType(),
    Utils.getCallLocationName(), true);
  TypeInformation<R> resultType = getProcessAllWindowFunctionReturnType(function, foldAccumulatorType);
  return fold(initialValue, foldFunction, function, foldAccumulatorType, resultType);
}

代码示例来源:origin: apache/flink

/**
 * Applies the given window function to each window. The window function is called for each
 * evaluation of the window for each key individually. The output of the window function is
 * interpreted as a regular non-windowed stream.
 *
 * <p>Arriving data is incrementally aggregated using the given fold function.
 *
 * @param initialValue The initial value of the fold.
 * @param foldFunction The fold function that is used for incremental aggregation.
 * @param function The window function.
 * @return The data stream that is the result of applying the window function to the window.
 *
 * @deprecated use {@link #aggregate(AggregateFunction, ProcessAllWindowFunction)} instead
 */
@PublicEvolving
@Deprecated
public <ACC, R> SingleOutputStreamOperator<R> fold(ACC initialValue, FoldFunction<T, ACC> foldFunction, AllWindowFunction<ACC, R, W> function) {
  TypeInformation<ACC> foldAccumulatorType = TypeExtractor.getFoldReturnTypes(foldFunction, input.getType(),
    Utils.getCallLocationName(), true);
  TypeInformation<R> resultType = getAllWindowFunctionReturnType(function, foldAccumulatorType);
  return fold(initialValue, foldFunction, function, foldAccumulatorType, resultType);
}

代码示例来源:origin: apache/flink

@Test
public void testFoldWindowAllState() throws Exception {
  final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
  env.registerTypeWithKryoSerializer(File.class, JavaSerializer.class);
  DataStream<String> src = env.fromElements("abc");
  SingleOutputStreamOperator<?> result = src
      .timeWindowAll(Time.milliseconds(1000))
      .fold(new File("/"), new FoldFunction<String, File>() {
        @Override
        public File fold(File a, String e) {
          return null;
        }
      });
  validateStateDescriptorConfigured(result);
}

代码示例来源:origin: apache/flink

@Test
public void testSessionWithFoldFails() throws Exception {
  // verify that fold does not work with merging windows
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  AllWindowedStream<String, TimeWindow> windowedStream = env.fromElements("Hello", "Ciao")
      .windowAll(EventTimeSessionWindows.withGap(Time.seconds(5)));
  try {
    windowedStream.fold("", new FoldFunction<String, String>() {
      private static final long serialVersionUID = -4567902917104921706L;
      @Override
      public String fold(String accumulator, String value) throws Exception {
        return accumulator;
      }
    });
  } catch (UnsupportedOperationException e) {
    // expected
    // use a catch to ensure that the exception is thrown by the fold
    return;
  }
  fail("The fold call should fail.");
}

代码示例来源:origin: apache/flink

.fold(Tuple2.of("R:", 0), new FoldFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
  @Override
  public Tuple2<String, Integer> fold(Tuple2<String, Integer> accumulator,

代码示例来源:origin: apache/flink

/**
 * .fold() does not support RichFoldFunction, since the fold function is used internally
 * in a {@code FoldingState}.
 */
@Test(expected = UnsupportedOperationException.class)
public void testFoldWithRichFolderFails() throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
  env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
  source
      .windowAll(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
      .fold(new Tuple2<>("", 0), new RichFoldFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
        private static final long serialVersionUID = -6448847205314995812L;
        @Override
        public Tuple2<String, Integer> fold(Tuple2<String, Integer> value1,
            Tuple2<String, Integer> value2) throws Exception {
          return null;
        }
      });
  fail("exception was not thrown");
}

代码示例来源:origin: apache/flink

.fold(0L, new FoldFunction<Long, Long>() {
  private static final long serialVersionUID = 1L;

代码示例来源:origin: apache/flink

@Test
@SuppressWarnings("rawtypes")
public void testFoldEventTime() throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
  DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
  DataStream<Tuple3<String, String, Integer>> window1 = source
      .windowAll(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
      .fold(new Tuple3<>("", "", 1), new DummyFolder());
  OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
      (OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window1.getTransformation();
  OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator();
  Assert.assertTrue(operator instanceof WindowOperator);
  WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
  Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
  Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows);
  Assert.assertTrue(winOperator.getStateDescriptor() instanceof FoldingStateDescriptor);
  processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}

代码示例来源:origin: apache/flink

@Test
@SuppressWarnings("rawtypes")
public void testFoldProcessingTime() throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
  DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
  DataStream<Tuple3<String, String, Integer>> window = source
      .windowAll(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
      .fold(new Tuple3<>("", "", 0), new DummyFolder());
  OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
      (OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String,  Integer>>) window.getTransformation();
  OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator();
  Assert.assertTrue(operator instanceof WindowOperator);
  WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
  Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
  Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingProcessingTimeWindows);
  Assert.assertTrue(winOperator.getStateDescriptor() instanceof FoldingStateDescriptor);
  processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}

代码示例来源:origin: apache/flink

@Test
@SuppressWarnings("rawtypes")
public void testFoldWithWindowFunctionEventTime() throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
  DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
  DataStream<Tuple2<String, Integer>> window = source
      .windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
      .fold(new Tuple3<>("", "", 0), new DummyFolder(), new AllWindowFunction<Tuple3<String, String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
        private static final long serialVersionUID = 1L;
        @Override
        public void apply(
            TimeWindow window,
            Iterable<Tuple3<String, String, Integer>> values,
            Collector<Tuple2<String, Integer>> out) throws Exception {
          for (Tuple3<String, String, Integer> in : values) {
            out.collect(new Tuple2<>(in.f0, in.f2));
          }
        }
      });
  OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform =
      (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window.getTransformation();
  OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
  Assert.assertTrue(operator instanceof WindowOperator);
  WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
  Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
  Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
  Assert.assertTrue(winOperator.getStateDescriptor() instanceof FoldingStateDescriptor);
  processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}

代码示例来源:origin: apache/flink

@Test
@SuppressWarnings("rawtypes")
public void testFoldWithCustomTrigger() throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
  DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
  DataStream<Tuple3<String, String, Integer>> window1 = source
      .windowAll(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
      .trigger(CountTrigger.of(1))
      .fold(new Tuple3<>("", "", 1), new DummyFolder());
  OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
      (OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window1.getTransformation();
  OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator();
  Assert.assertTrue(operator instanceof WindowOperator);
  WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
  Assert.assertTrue(winOperator.getTrigger() instanceof CountTrigger);
  Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows);
  Assert.assertTrue(winOperator.getStateDescriptor() instanceof FoldingStateDescriptor);
  processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}

代码示例来源:origin: apache/flink

@Test
@SuppressWarnings("rawtypes")
public void testFoldWithWindowFunctionProcessingTime() throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
  DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
  DataStream<Tuple2<String, Integer>> window = source
      .windowAll(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
      .fold(new Tuple3<>("", "empty", 0), new DummyFolder(), new AllWindowFunction<Tuple3<String, String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
        private static final long serialVersionUID = 1L;
        @Override
        public void apply(
            TimeWindow window,
            Iterable<Tuple3<String, String, Integer>> values,
            Collector<Tuple2<String, Integer>> out) throws Exception {
          for (Tuple3<String, String, Integer> in : values) {
            out.collect(new Tuple2<>(in.f0, in.f2));
          }
        }
      });
  OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform =
      (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window.getTransformation();
  OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
  Assert.assertTrue(operator instanceof WindowOperator);
  WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
  Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
  Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
  Assert.assertTrue(winOperator.getStateDescriptor() instanceof FoldingStateDescriptor);
  processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}

代码示例来源:origin: apache/flink

@Test
@SuppressWarnings("rawtypes")
public void testFoldWithProcessAllWindowFunctionEventTime() throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
  DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
  DataStream<Tuple2<String, Integer>> window = source
      .windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
      .fold(new Tuple3<>("", "", 0), new DummyFolder(), new ProcessAllWindowFunction<Tuple3<String, String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
        private static final long serialVersionUID = 1L;
        @Override
        public void process(
            Context ctx,
            Iterable<Tuple3<String, String, Integer>> values,
            Collector<Tuple2<String, Integer>> out) throws Exception {
          for (Tuple3<String, String, Integer> in : values) {
            out.collect(new Tuple2<>(in.f0, in.f2));
          }
        }
      });
  OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform =
      (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window.getTransformation();
  OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
  Assert.assertTrue(operator instanceof WindowOperator);
  WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
  Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
  Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
  Assert.assertTrue(winOperator.getStateDescriptor() instanceof FoldingStateDescriptor);
  processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}

代码示例来源:origin: apache/flink

@Test
@SuppressWarnings("rawtypes")
public void testFoldWithProcessAllWindowFunctionProcessingTime() throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
  DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
  DataStream<Tuple2<String, Integer>> window = source
      .windowAll(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
      .fold(new Tuple3<>("", "empty", 0), new DummyFolder(), new ProcessAllWindowFunction<Tuple3<String, String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
        private static final long serialVersionUID = 1L;
        @Override
        public void process(
            Context ctx,
            Iterable<Tuple3<String, String, Integer>> values,
            Collector<Tuple2<String, Integer>> out) throws Exception {
          for (Tuple3<String, String, Integer> in : values) {
            out.collect(new Tuple2<>(in.f0, in.f2));
          }
        }
      });
  OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform =
      (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window.getTransformation();
  OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
  Assert.assertTrue(operator instanceof WindowOperator);
  WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
  Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
  Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
  Assert.assertTrue(winOperator.getStateDescriptor() instanceof FoldingStateDescriptor);
  processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}

代码示例来源:origin: apache/flink

@Test
@SuppressWarnings({"rawtypes", "unchecked"})
public void testFoldWithEvictor() throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
  DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
  DataStream<Tuple3<String, String, Integer>> window1 = source
      .windowAll(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
      .evictor(CountEvictor.of(100))
      .fold(new Tuple3<>("", "", 1), new DummyFolder());
  OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
      (OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window1.getTransformation();
  OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator();
  Assert.assertTrue(operator instanceof EvictingWindowOperator);
  EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?> winOperator = (EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?>) operator;
  Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
  Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows);
  Assert.assertTrue(winOperator.getEvictor() instanceof CountEvictor);
  Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
  winOperator.setOutputType((TypeInformation) window1.getType(), new ExecutionConfig());
  processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}

代码示例来源:origin: apache/flink

.windowAll(GlobalWindows.create())
.trigger(PurgingTrigger.of(CountTrigger.of(5)))
.fold(new CustomPOJO(), new FoldFunction<String, CustomPOJO>() {
  private static final long serialVersionUID = 1L;

代码示例来源:origin: apache/flink

.windowAll(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
.evictor(CountEvictor.of(100))
.fold(
    new Tuple3<>("", "", 1),
    new DummyFolder(),

代码示例来源:origin: apache/flink

.windowAll(GlobalWindows.create())
.trigger(PurgingTrigger.of(CountTrigger.of(10)))
.fold(0L, new FoldFunction<Long, Long>() {
  @Override
  public Long fold(Long accumulator, Long value) throws Exception {

相关文章