org.apache.flink.streaming.api.datastream.AllWindowedStream.apply()方法的使用及代码示例

x33g5p2x  于2022-01-17 转载在 其他  
字(18.1k)|赞(0)|评价(0)|浏览(159)

本文整理了Java中org.apache.flink.streaming.api.datastream.AllWindowedStream.apply()方法的一些代码示例,展示了AllWindowedStream.apply()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。AllWindowedStream.apply()方法的具体详情如下:
包路径:org.apache.flink.streaming.api.datastream.AllWindowedStream
类名称:AllWindowedStream
方法名:apply

AllWindowedStream.apply介绍

[英]Applies the given window function to each window. The window function is called for each evaluation of the window for each key individually. The output of the window function is interpreted as a regular non-windowed stream.

Arriving data is incrementally aggregated using the given fold function.
[中]将给定的窗口函数应用于每个窗口。对于每个关键点的每次窗口评估,都会分别调用window函数。窗口函数的输出被解释为常规的非窗口流。
到达的数据使用给定的fold函数进行增量聚合。

代码示例

代码示例来源:origin: apache/flink

/**
 * Applies the given window function to each window. The window function is called for each
 * evaluation of the window for each key individually. The output of the window function is
 * interpreted as a regular non-windowed stream.
 *
 * <p>Arriving data is incrementally aggregated using the given reducer.
 *
 * @param reduceFunction The reduce function that is used for incremental aggregation.
 * @param function The window function.
 * @return The data stream that is the result of applying the window function to the window.
 *
 * @deprecated Use {@link #reduce(ReduceFunction, AllWindowFunction)} instead.
 */
@Deprecated
public <R> SingleOutputStreamOperator<R> apply(ReduceFunction<T> reduceFunction, AllWindowFunction<T, R, W> function) {
  TypeInformation<T> inType = input.getType();
  TypeInformation<R> resultType = getAllWindowFunctionReturnType(function, inType);
  return apply(reduceFunction, function, resultType);
}

代码示例来源:origin: apache/flink

/**
 * Applies the given window function to each window. The window function is called for each
 * evaluation of the window for each key individually. The output of the window function is
 * interpreted as a regular non-windowed stream.
 *
 * <p>Arriving data is incrementally aggregated using the given fold function.
 *
 * @param initialValue The initial value of the fold.
 * @param foldFunction The fold function that is used for incremental aggregation.
 * @param function The window function.
 * @return The data stream that is the result of applying the window function to the window.
 *
 * @deprecated Use {@link #fold(Object, FoldFunction, AllWindowFunction)} instead.
 */
@Deprecated
public <R> SingleOutputStreamOperator<R> apply(R initialValue, FoldFunction<T, R> foldFunction, AllWindowFunction<R, R, W> function) {
  TypeInformation<R> resultType = TypeExtractor.getFoldReturnTypes(foldFunction, input.getType(),
      Utils.getCallLocationName(), true);
  return apply(initialValue, foldFunction, function, resultType);
}

代码示例来源:origin: apache/flink

/**
 * Applies the given window function to each window. The window function is called for each
 * evaluation of the window. The output of the window function is
 * interpreted as a regular non-windowed stream.
 *
 * <p>Not that this function requires that all data in the windows is buffered until the window
 * is evaluated, as the function provides no means of incremental aggregation.
 *
 * @param function The window function.
 * @return The data stream that is the result of applying the window function to the window.
 */
public <R> SingleOutputStreamOperator<R> apply(AllWindowFunction<T, R, W> function, TypeInformation<R> resultType) {
  String callLocation = Utils.getCallLocationName();
  function = input.getExecutionEnvironment().clean(function);
  return apply(new InternalIterableAllWindowFunction<>(function), resultType, callLocation);
}

代码示例来源:origin: apache/flink

/**
 * Applies the given window function to each window. The window function is called for each
 * evaluation of the window. The output of the window function is
 * interpreted as a regular non-windowed stream.
 *
 * <p>Not that this function requires that all data in the windows is buffered until the window
 * is evaluated, as the function provides no means of incremental aggregation.
 *
 * @param function The process window function.
 * @return The data stream that is the result of applying the window function to the window.
 */
@PublicEvolving
public <R> SingleOutputStreamOperator<R> process(ProcessAllWindowFunction<T, R, W> function, TypeInformation<R> resultType) {
  String callLocation = Utils.getCallLocationName();
  function = input.getExecutionEnvironment().clean(function);
  return apply(new InternalIterableProcessAllWindowFunction<>(function), resultType, callLocation);
}

代码示例来源:origin: apache/flink

/**
 * Applies the given window function to each window. The window function is called for each
 * evaluation of the window. The output of the window function is
 * interpreted as a regular non-windowed stream.
 *
 * <p>Not that this function requires that all data in the windows is buffered until the window
 * is evaluated, as the function provides no means of incremental aggregation.
 *
 * @param function The window function.
 * @return The data stream that is the result of applying the window function to the window.
 */
public <R> SingleOutputStreamOperator<R> apply(AllWindowFunction<T, R, W> function) {
  String callLocation = Utils.getCallLocationName();
  function = input.getExecutionEnvironment().clean(function);
  TypeInformation<R> resultType = getAllWindowFunctionReturnType(function, getInputType());
  return apply(new InternalIterableAllWindowFunction<>(function), resultType, callLocation);
}

代码示例来源:origin: apache/flink

/**
 * Applies the given window function to each window. The window function is called for each
 * evaluation of the window. The output of the window function is
 * interpreted as a regular non-windowed stream.
 *
 * <p>Not that this function requires that all data in the windows is buffered until the window
 * is evaluated, as the function provides no means of incremental aggregation.
 *
 * @param function The process window function.
 * @return The data stream that is the result of applying the window function to the window.
 */
@PublicEvolving
public <R> SingleOutputStreamOperator<R> process(ProcessAllWindowFunction<T, R, W> function) {
  String callLocation = Utils.getCallLocationName();
  function = input.getExecutionEnvironment().clean(function);
  TypeInformation<R> resultType = getProcessAllWindowFunctionReturnType(function, getInputType());
  return apply(new InternalIterableProcessAllWindowFunction<>(function), resultType, callLocation);
}

代码示例来源:origin: apache/flink

@Test
public void testApplyWindowAllState() throws Exception {
  final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
  env.registerTypeWithKryoSerializer(File.class, JavaSerializer.class);
  DataStream<File> src = env.fromElements(new File("/"));
  SingleOutputStreamOperator<?> result = src
      .timeWindowAll(Time.milliseconds(1000))
      .apply(new AllWindowFunction<File, String, TimeWindow>() {
        @Override
        public void apply(TimeWindow window, Iterable<File> input, Collector<String> out) {}
      });
  validateListStateDescriptorConfigured(result);
}

代码示例来源:origin: apache/flink

public static void main(String[] args) throws Exception {
  // Checking input parameters
  final ParameterTool params = ParameterTool.fromArgs(args);
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
  DataStream<Integer> trainingData = env.addSource(new FiniteTrainingDataSource());
  DataStream<Integer> newData = env.addSource(new FiniteNewDataSource());
  // build new model on every second of new data
  DataStream<Double[]> model = trainingData
      .assignTimestampsAndWatermarks(new LinearTimestamp())
      .timeWindowAll(Time.of(5000, TimeUnit.MILLISECONDS))
      .apply(new PartialModelBuilder());
  // use partial model for newData
  DataStream<Integer> prediction = newData.connect(model).map(new Predictor());
  // emit result
  if (params.has("output")) {
    prediction.writeAsText(params.get("output"));
  } else {
    System.out.println("Printing result to stdout. Use --output to specify output path.");
    prediction.print();
  }
  // execute program
  env.execute("Streaming Incremental Learning");
}

代码示例来源:origin: apache/flink

.rebalance()
.timeWindowAll(Time.of(windowSize, MILLISECONDS))
.apply(new RichAllWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, TimeWindow>() {

代码示例来源:origin: apache/flink

.timeWindowAll(Time.milliseconds(1), Time.milliseconds(1))
.sideOutputLateData(lateDataTag)
.apply(new AllWindowFunction<Integer, Integer, TimeWindow>() {
  private static final long serialVersionUID = 1L;

代码示例来源:origin: apache/flink

.rebalance()
.timeWindowAll(Time.of(windowSize, MILLISECONDS), Time.of(windowSlide, MILLISECONDS))
.apply(new RichAllWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, TimeWindow>() {

代码示例来源:origin: apache/flink

@Test
@SuppressWarnings("rawtypes")
public void testApplyEventTime() throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
  DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
  DataStream<Tuple2<String, Integer>> window1 = source
      .windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
      .apply(new AllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
        private static final long serialVersionUID = 1L;
        @Override
        public void apply(
            TimeWindow window,
            Iterable<Tuple2<String, Integer>> values,
            Collector<Tuple2<String, Integer>> out) throws Exception {
          for (Tuple2<String, Integer> in : values) {
            out.collect(in);
          }
        }
      });
  OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
  OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
  Assert.assertTrue(operator instanceof WindowOperator);
  WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
  Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
  Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
  Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
  processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}

代码示例来源:origin: apache/flink

@Test
@SuppressWarnings("rawtypes")
public void testApplyProcessingTimeTime() throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
  DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
  DataStream<Tuple2<String, Integer>> window1 = source
      .windowAll(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
      .apply(new AllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
        private static final long serialVersionUID = 1L;
        @Override
        public void apply(
            TimeWindow window,
            Iterable<Tuple2<String, Integer>> values,
            Collector<Tuple2<String, Integer>> out) throws Exception {
          for (Tuple2<String, Integer> in : values) {
            out.collect(in);
          }
        }
      });
  OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
  OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
  Assert.assertTrue(operator instanceof WindowOperator);
  WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
  Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
  Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
  Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
  processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}

代码示例来源:origin: apache/flink

@Test
@SuppressWarnings("rawtypes")
public void testApplyWithCustomTrigger() throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
  DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
  DataStream<Tuple2<String, Integer>> window1 = source
      .windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
      .trigger(CountTrigger.of(1))
      .apply(new AllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
        private static final long serialVersionUID = 1L;
        @Override
        public void apply(
            TimeWindow window,
            Iterable<Tuple2<String, Integer>> values,
            Collector<Tuple2<String, Integer>> out) throws Exception {
          for (Tuple2<String, Integer> in : values) {
            out.collect(in);
          }
        }
      });
  OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
  OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
  Assert.assertTrue(operator instanceof WindowOperator);
  WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
  Assert.assertTrue(winOperator.getTrigger() instanceof CountTrigger);
  Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
  Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
  processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}

代码示例来源:origin: apache/flink

.apply(reducer, new AllWindowFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>, TimeWindow>() {
  private static final long serialVersionUID = 1L;

代码示例来源:origin: apache/flink

@Test
@SuppressWarnings("rawtypes")
public void testApplyWithPreFolderEventTime() throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
  DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
  DataStream<Tuple3<String, String, Integer>> window = source
      .windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
      .apply(new Tuple3<>("", "", 0), new DummyFolder(), new AllWindowFunction<Tuple3<String, String, Integer>, Tuple3<String, String, Integer>, TimeWindow>() {
        private static final long serialVersionUID = 1L;
        @Override
        public void apply(
            TimeWindow window,
            Iterable<Tuple3<String, String, Integer>> values,
            Collector<Tuple3<String, String, Integer>> out) throws Exception {
          for (Tuple3<String, String, Integer> in : values) {
            out.collect(new Tuple3<>(in.f0, in.f1, in.f2));
          }
        }
      });
  OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
      (OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation();
  OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator();
  Assert.assertTrue(operator instanceof WindowOperator);
  WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
  Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
  Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
  Assert.assertTrue(winOperator.getStateDescriptor() instanceof FoldingStateDescriptor);
  processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}

代码示例来源:origin: apache/flink

.trigger(CountTrigger.of(1))
.evictor(TimeEvictor.of(Time.of(100, TimeUnit.MILLISECONDS)))
.apply(new AllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
  private static final long serialVersionUID = 1L;

代码示例来源:origin: apache/flink

.windowAll(GlobalWindows.create())
.trigger(PurgingTrigger.of(CountTrigger.of(5)))
.apply(new AllWindowFunction<Tuple2<Integer, String>, String, GlobalWindow>() {
  @Override
  public void apply(GlobalWindow window,

代码示例来源:origin: org.apache.flink/flink-streaming-java_2.10

/**
 * Applies the given window function to each window. The window function is called for each
 * evaluation of the window. The output of the window function is
 * interpreted as a regular non-windowed stream.
 *
 * <p>Not that this function requires that all data in the windows is buffered until the window
 * is evaluated, as the function provides no means of incremental aggregation.
 *
 * @param function The window function.
 * @return The data stream that is the result of applying the window function to the window.
 */
public <R> SingleOutputStreamOperator<R> apply(AllWindowFunction<T, R, W> function, TypeInformation<R> resultType) {
  String callLocation = Utils.getCallLocationName();
  function = input.getExecutionEnvironment().clean(function);
  return apply(new InternalIterableAllWindowFunction<>(function), resultType, callLocation);
}

代码示例来源:origin: org.apache.flink/flink-streaming-java

/**
 * Applies the given window function to each window. The window function is called for each
 * evaluation of the window. The output of the window function is
 * interpreted as a regular non-windowed stream.
 *
 * <p>Not that this function requires that all data in the windows is buffered until the window
 * is evaluated, as the function provides no means of incremental aggregation.
 *
 * @param function The window function.
 * @return The data stream that is the result of applying the window function to the window.
 */
public <R> SingleOutputStreamOperator<R> apply(AllWindowFunction<T, R, W> function) {
  String callLocation = Utils.getCallLocationName();
  function = input.getExecutionEnvironment().clean(function);
  TypeInformation<R> resultType = getAllWindowFunctionReturnType(function, getInputType());
  return apply(new InternalIterableAllWindowFunction<>(function), resultType, callLocation);
}

相关文章