org.apache.beam.sdk.transforms.windowing.Window类的使用及代码示例

x33g5p2x  于2022-02-03 转载在 其他  
字(14.5k)|赞(0)|评价(0)|浏览(104)

本文整理了Java中org.apache.beam.sdk.transforms.windowing.Window类的一些代码示例,展示了Window类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Window类的具体详情如下:
包路径:org.apache.beam.sdk.transforms.windowing.Window
类名称:Window

Window介绍

[英]Window logically divides up or groups the elements of a PCollection into finite windows according to a WindowFn. The output of Window contains the same elements as input, but they have been logically assigned to windows. The next org.apache.beam.sdk.transforms.GroupByKey, including one within composite transforms, will group by the combination of keys and windows.

See org.apache.beam.sdk.transforms.GroupByKey for more information about how grouping with windows works.

Windowing

Windowing a PCollection divides the elements into windows based on the associated event time for each element. This is especially useful for PCollection with unbounded size, since it allows operating on a sub-group of the elements placed into a related window. For PCollection with a bounded size (aka. conventional batch mode), by default, all data is implicitly in a single window, unless Window is applied.

For example, a simple form of windowing divides up the data into fixed-width time intervals, using FixedWindows. The following example demonstrates how to use Window in a pipeline that counts the number of occurrences of strings each minute:

PCollection items = ...;

Let (data, timestamp) denote a data element along with its timestamp. Then, if the input to this pipeline consists of {("foo", 15s), ("bar", 30s), ("foo", 45s), ("foo", 1m30s)}, the output will be {(KV("foo", 2), 1m), (KV("bar", 1), 1m), (KV("foo", 1), 2m)}

Several predefined WindowFns are provided:

  • FixedWindows partitions the timestamps into fixed-width intervals.
  • SlidingWindows places data into overlapping fixed-width intervals.
  • Sessions groups data into sessions where each item in a window is separated from the next by no more than a specified gap.

Additionally, custom WindowFns can be created, by creating new subclasses of WindowFn.
Triggers

Window#triggering(Trigger) allows specifying a trigger to control when (in processing time) results for the given window can be produced. If unspecified, the default behavior is to trigger first when the watermark passes the end of the window, and then trigger again every time there is late arriving data.

Elements are added to the current window pane as they arrive. When the root trigger fires, output is produced based on the elements in the current pane.

Depending on the trigger, this can be used both to output partial results early during the processing of the whole window, and to deal with late arriving in batches.

Continuing the earlier example, if we wanted to emit the values that were available when the watermark passed the end of the window, and then output any late arriving elements once-per (actual hour) hour until we have finished processing the next 24-hours of data. (The use of watermark time to stop processing tends to be more robust if the data source is slow for a few days, etc.)

PCollection items = ...;

On the other hand, if we wanted to get early results every minute of processing time (for which there were new elements in the given window) we could do the following:

PCollection windowed_items = items.apply(

After a org.apache.beam.sdk.transforms.GroupByKey the trigger is set to a trigger that will preserve the intent of the upstream trigger. See Trigger#getContinuationTrigger for more information.

See Trigger for details on the available triggers.
[中]Window根据WindowFn将PCollection的元素逻辑地划分或分组为有限的窗口。Window的输出包含与输入相同的元素,但它们已被逻辑分配给windows。下一个组织。阿帕奇。梁sdk。转变。GroupByKey(包括复合变换中的一个)将按键和窗口的组合进行分组。
见org。阿帕奇。梁sdk。转变。GroupByKey了解有关windows分组工作方式的更多信息。
开窗
窗口化PCollection根据每个元素的关联事件时间将元素划分为多个窗口。这对于具有无限大小的PCollection特别有用,因为它允许对放置在相关窗口中的元素的子组进行操作。对于大小有限的PCollection(也称为常规批处理模式),默认情况下,所有数据都隐式地位于单个窗口中,除非应用了窗口。
例如,一种简单的窗口形式使用FixedWindows将数据划分为固定宽度的时间间隔。下面的示例演示如何在管道中使用Window,该管道计算每分钟字符串的出现次数:

PCollection items = ...;

Let(data,timestamp)表示一个数据元素及其时间戳。然后,如果该管道的输入由{(“foo”,15s),(“bar”,30s),(“foo”,45s),(“foo”,1m30s)}组成,则输出将为{(KV(“foo”,2),1m),(KV(“bar”,1),1m),(KV(“foo”,1),2m)}
提供了几个预定义的窗口fn:
*FixedWindows将时间戳划分为固定宽度的间隔。
*滑动窗口将数据放置在重叠的固定宽度间隔中。
*会话将数据分组到会话中,其中窗口中的每个项目与下一个项目之间的间隔不超过指定的间隔。
此外,还可以通过创建WindowFn的新子类来创建自定义WindowFn。
触发
窗口#触发(触发器)允许指定一个触发器来控制何时(在处理时间内)可以生成给定窗口的结果。如果未指定,默认行为是在水印通过窗口末尾时首先触发,然后在每次有延迟到达的数据时再次触发。
元素到达时会添加到当前窗格中。当根触发器触发时,根据当前窗格中的元素生成输出。
根据触发器的不同,它既可以用于在整个窗口处理过程中的早期输出部分结果,也可以用于处理批量延迟到达的结果。
继续前面的例子,如果我们想发出水印通过窗口末尾时可用的值,然后每(实际小时)小时输出一次延迟到达的元素,直到我们完成接下来24小时的数据处理。(如果数据源慢了几天,使用水印时间来停止处理往往更可靠,等等。)

PCollection items = ...;

另一方面,如果我们想在每分钟的处理时间(在给定的窗口中有新元素)获得早期结果,我们可以执行以下操作:

PCollection windowed_items = items.apply(

在一个组织之后。阿帕奇。梁sdk。转变。GroupByKey触发器设置为保留上游触发器意图的触发器。有关更多信息,请参阅触发器#getContinuationTrigger。
有关可用触发器的详细信息,请参阅触发器。

代码示例

代码示例来源:origin: org.apache.beam/beam-examples-java

@Override
 public PCollection<KV<String, Integer>> expand(PCollection<GameActionInfo> input) {
  return input
    .apply(
      "LeaderboardUserGlobalWindow",
      Window.<GameActionInfo>into(new GlobalWindows())
        // Get periodic results every ten minutes.
        .triggering(
          Repeatedly.forever(
            AfterProcessingTime.pastFirstElementInPane().plusDelayOf(TEN_MINUTES)))
        .accumulatingFiredPanes()
        .withAllowedLateness(allowedLateness))
    // Extract and sum username/score pairs from the event data.
    .apply("ExtractUserScore", new ExtractAndSumScore("user"));
 }
}

代码示例来源:origin: takidau/streamingbook

@Override
public PCollection<String> expand(PCollection<KV<String, Integer>> input) {
  return input
    .apply(Window.<KV<String, Integer>>into(FixedWindows.of(TWO_MINUTES))
        .triggering(AfterWatermark.pastEndOfWindow()
              .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(ONE_MINUTE))
              .withLateFirings(AfterPane.elementCountAtLeast(1)))
        .withAllowedLateness(Duration.standardDays(1000))
        .discardingFiredPanes())
    .apply(Sum.integersPerKey())
    .apply(ParDo.of(new FormatAsStrings()));
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

private <SignalT> PCollectionView<?> expandTyped(PCollection<SignalT> input) {
  return input
    .apply(Window.<SignalT>configure().triggering(Never.ever()).discardingFiredPanes())
    // Perform a per-window pre-combine so that our performance does not critically depend
    // on combiner lifting.
    .apply(ParDo.of(new CollectWindowsFn<>()))
    .apply(Sample.any(1))
    .apply(View.asList());
 }
}

代码示例来源:origin: GoogleCloudPlatform/DataflowTemplates

@Override
public PCollection<Void> expand(PCollection<KV<K, V>> input) {
 int numShards = spec.getNumShards();
 if (numShards <= 0) {
  try (Consumer<?, ?> consumer = openConsumer(spec)) {
   numShards = consumer.partitionsFor(spec.getTopic()).size();
   LOG.info(
     "Using {} shards for exactly-once writer, matching number of partitions "
       + "for topic '{}'",
     numShards,
     spec.getTopic());
  }
 }
 checkState(numShards > 0, "Could not set number of shards");
 return input
   .apply(
     Window.<KV<K, V>>into(new GlobalWindows()) // Everything into global window.
       .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
       .discardingFiredPanes())
   .apply(
     String.format("Shuffle across %d shards", numShards),
     ParDo.of(new Reshard<>(numShards)))
   .apply("Persist sharding", GroupByKey.create())
   .apply("Assign sequential ids", ParDo.of(new Sequencer<>()))
   .apply("Persist ids", GroupByKey.create())
   .apply(
     String.format("Write to Kafka topic '%s'", spec.getTopic()),
     ParDo.of(new ExactlyOnceWriter<>(spec, input.getCoder())));
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

@Override
public <T> PTransform<PCollection<T>, PCollection<T>> windowDummy() {
 return Window.into(windowFn);
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

@Override
 public PCollection<String> expand(PCollection<String> in) {
  return in.apply(
      "Window",
      Window.<String>into(windowFn).withTimestampCombiner(TimestampCombiner.EARLIEST))
    .apply(Count.perElement())
    .apply("FormatCounts", ParDo.of(new FormatCountsDoFn()))
    .setCoder(StringUtf8Coder.of());
 }
}

代码示例来源:origin: jbonofre/beam-samples

.apply(Window.<KV<String, String>>into(
  FixedWindows.of(Duration.standardSeconds(options.getWindowSize())))
  .withAllowedLateness(Duration.ZERO).discardingFiredPanes());
.apply(Window.<KV<String, Integer>>into(
  FixedWindows.of(Duration.standardSeconds(options.getWindowSize())))
  .withAllowedLateness(Duration.ZERO).discardingFiredPanes());

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

@Test
public void testDisplayDataExcludesDefaults() {
 Window<?> window =
   Window.into(new GlobalWindows())
     .triggering(DefaultTrigger.of())
     .withAllowedLateness(Duration.millis(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()));
 DisplayData data = DisplayData.from(window);
 assertThat(data, not(hasDisplayItem("trigger")));
 assertThat(data, not(hasDisplayItem("allowedLateness")));
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

PCollection<KV<Integer, Iterable<ValueInSingleWindow<T>>>>>
removeTriggering =
  Window.<KV<Integer, Iterable<ValueInSingleWindow<T>>>>configure()
    .triggering(Never.ever())
    .discardingFiredPanes()
    .withAllowedLateness(input.getWindowingStrategy().getAllowedLateness());
  .apply(
    "NeverTrigger",
    Window.<KV<Integer, Iterable<ValueInSingleWindow<T>>>>configure()
      .triggering(Never.ever())
      .withAllowedLateness(input.getWindowingStrategy().getAllowedLateness())
      .discardingFiredPanes())
  .apply("GroupDummyAndContents", GroupByKey.create());

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

@Test
public void testDisplayDataExcludesUnspecifiedProperties() {
 Window<?> onlyHasAccumulationMode = Window.configure().discardingFiredPanes();
 assertThat(
   DisplayData.from(onlyHasAccumulationMode),
   not(
     hasDisplayItem(
       hasKey(
         isOneOf(
           "windowFn",
           "trigger",
           "timestampCombiner",
           "allowedLateness",
           "closingBehavior")))));
 Window<?> noAccumulationMode = Window.into(new GlobalWindows());
 assertThat(
   DisplayData.from(noAccumulationMode), not(hasDisplayItem(hasKey("accumulationMode"))));
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

@Test
public void testMissingModeViaLateness() {
 FixedWindows fixed = FixedWindows.of(Duration.standardMinutes(10));
 PCollection<String> input =
   pipeline
     .apply(Create.of("hello", "world").withCoder(StringUtf8Coder.of()))
     .apply("Window", Window.into(fixed));
 thrown.expect(IllegalArgumentException.class);
 thrown.expectMessage("allowed lateness");
 thrown.expectMessage("accumulation mode be specified");
 input.apply(
   "Lateness", Window.<String>configure().withAllowedLateness(Duration.standardDays(1)));
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-extensions-sql

@Test
public void testUnsupportedGlobalWindowWithDefaultTrigger() {
 exceptions.expect(UnsupportedOperationException.class);
 pipeline.enableAbandonedNodeEnforcement(false);
 PCollection<Row> input =
   unboundedInput1.apply(
     "unboundedInput1.globalWindow",
     Window.<Row>into(new GlobalWindows()).triggering(DefaultTrigger.of()));
 String sql = "SELECT f_int2, COUNT(*) AS `size` FROM PCOLLECTION GROUP BY f_int2";
 input.apply("testUnsupportedGlobalWindows", SqlTransform.query(sql));
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

@Test
public void testWindowIntoAccumulatingLatenessNoTrigger() {
 FixedWindows fixed = FixedWindows.of(Duration.standardMinutes(10));
 WindowingStrategy<?, ?> strategy =
   pipeline
     .apply(Create.of("hello", "world").withCoder(StringUtf8Coder.of()))
     .apply(
       "Lateness",
       Window.<String>into(fixed)
         .withAllowedLateness(Duration.standardDays(1))
         .accumulatingFiredPanes())
     .getWindowingStrategy();
 assertThat(strategy.isTriggerSpecified(), is(false));
 assertThat(strategy.isModeSpecified(), is(true));
 assertThat(strategy.isAllowedLatenessSpecified(), is(true));
 assertThat(strategy.getMode(), equalTo(AccumulationMode.ACCUMULATING_FIRED_PANES));
 assertThat(strategy.getAllowedLateness(), equalTo(Duration.standardDays(1)));
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

@Test
public void testWindowGetName() {
 assertEquals(
   "Window.Into()",
   Window.<String>into(FixedWindows.of(Duration.standardMinutes(10))).getName());
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-io-kafka

@Override
public PCollection<Void> expand(PCollection<KV<K, V>> input) {
 int numShards = spec.getNumShards();
 if (numShards <= 0) {
  try (Consumer<?, ?> consumer = openConsumer(spec)) {
   numShards = consumer.partitionsFor(spec.getTopic()).size();
   LOG.info(
     "Using {} shards for exactly-once writer, matching number of partitions "
       + "for topic '{}'",
     numShards,
     spec.getTopic());
  }
 }
 checkState(numShards > 0, "Could not set number of shards");
 return input
   .apply(
     Window.<KV<K, V>>into(new GlobalWindows()) // Everything into global window.
       .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
       .discardingFiredPanes())
   .apply(
     String.format("Shuffle across %d shards", numShards),
     ParDo.of(new Reshard<>(numShards)))
   .apply("Persist sharding", GroupByKey.create())
   .apply("Assign sequential ids", ParDo.of(new Sequencer<>()))
   .apply("Persist ids", GroupByKey.create())
   .apply(
     String.format("Write to Kafka topic '%s'", spec.getTopic()),
     ParDo.of(new ExactlyOnceWriter<>(spec, input.getCoder())));
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

@Override
 public <T> PTransform<PCollection<T>, PCollection<T>> windowActuals() {
  return Window.into(windowFn.intoOnlyExisting());
 }
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

.apply(
  "WindowClicks",
  Window.<KV<Integer, String>>into(FixedWindows.of(new Duration(4)))
    .withTimestampCombiner(TimestampCombiner.EARLIEST));
.apply(
  "WindowPurchases",
  Window.<KV<Integer, String>>into(FixedWindows.of(new Duration(4)))
    .withTimestampCombiner(TimestampCombiner.EARLIEST));

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

p.apply(stream)
  .apply(
    Window.<String>into(FixedWindows.of(Duration.standardMinutes(1)))
      .withAllowedLateness(Duration.standardMinutes(1))
      .discardingFiredPanes());

代码示例来源:origin: org.apache.beam/beam-runners-direct-java

Window.<KV<K, WindowedValue<KV<K, InputT>>>>configure()
  .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
  .discardingFiredPanes()
  .withAllowedLateness(inputWindowingStrategy.getAllowedLateness())
  .withTimestampCombiner(TimestampCombiner.EARLIEST))

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

private void testOutputAfterCheckpoint(IsBounded bounded) {
 PCollection<Integer> outputs =
   p.apply(Create.of("foo"))
     .apply(ParDo.of(sdfWithMultipleOutputsPerBlock(bounded, 3)))
     .apply(Window.<Integer>configure().triggering(Never.ever()).discardingFiredPanes());
 PAssert.thatSingleton(outputs.apply(Count.globally()))
   .isEqualTo((long) SDFWithMultipleOutputsPerBlockBase.MAX_INDEX);
 p.run();
}

相关文章