org.apache.beam.sdk.transforms.windowing.Window.triggering()方法的使用及代码示例

x33g5p2x  于2022-02-03 转载在 其他  
字(12.9k)|赞(0)|评价(0)|浏览(71)

本文整理了Java中org.apache.beam.sdk.transforms.windowing.Window.triggering()方法的一些代码示例,展示了Window.triggering()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Window.triggering()方法的具体详情如下:
包路径:org.apache.beam.sdk.transforms.windowing.Window
类名称:Window
方法名:triggering

Window.triggering介绍

[英]Sets a non-default trigger for this Window PTransform. Elements that are assigned to a specific window will be output when the trigger fires.

org.apache.beam.sdk.transforms.windowing.Trigger has more details on the available triggers.

Must also specify allowed lateness using #withAllowedLateness and accumulation mode using either #discardingFiredPanes() or #accumulatingFiredPanes().
[中]为此窗口PTransform设置非默认触发器。触发器触发时,将输出指定给特定窗口的元素。
组织。阿帕奇。梁sdk。转变。开窗。Trigger提供了有关可用触发器的更多详细信息。
还必须使用#withAllowedLateness指定允许的延迟,并使用#discardingFiredPanes()或#AccumatingFiredPanes()指定累积模式。

代码示例

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

private <SignalT> PCollectionView<?> expandTyped(PCollection<SignalT> input) {
  return input
    .apply(Window.<SignalT>configure().triggering(Never.ever()).discardingFiredPanes())
    // Perform a per-window pre-combine so that our performance does not critically depend
    // on combiner lifting.
    .apply(ParDo.of(new CollectWindowsFn<>()))
    .apply(Sample.any(1))
    .apply(View.asList());
 }
}

代码示例来源:origin: org.apache.beam/beam-examples-java

@Override
 public PCollection<KV<String, Integer>> expand(PCollection<GameActionInfo> input) {
  return input
    .apply(
      "LeaderboardUserGlobalWindow",
      Window.<GameActionInfo>into(new GlobalWindows())
        // Get periodic results every ten minutes.
        .triggering(
          Repeatedly.forever(
            AfterProcessingTime.pastFirstElementInPane().plusDelayOf(TEN_MINUTES)))
        .accumulatingFiredPanes()
        .withAllowedLateness(allowedLateness))
    // Extract and sum username/score pairs from the event data.
    .apply("ExtractUserScore", new ExtractAndSumScore("user"));
 }
}

代码示例来源:origin: takidau/streamingbook

@Override
public PCollection<String> expand(PCollection<KV<String, Integer>> input) {
  return input
   .apply(Window.<KV<String, Integer>>into(FixedWindows.of(TWO_MINUTES))
      .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
         .withAllowedLateness(Duration.standardDays(1000))
         .accumulatingFiredPanes())
   .apply(Sum.integersPerKey())
   .apply(ParDo.of(new FormatAsStrings()));
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

@Test
public void testDisplayDataExcludesDefaults() {
 Window<?> window =
   Window.into(new GlobalWindows())
     .triggering(DefaultTrigger.of())
     .withAllowedLateness(Duration.millis(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()));
 DisplayData data = DisplayData.from(window);
 assertThat(data, not(hasDisplayItem("trigger")));
 assertThat(data, not(hasDisplayItem("allowedLateness")));
}

代码示例来源:origin: takidau/streamingbook

@Override
public PCollection<String> expand(PCollection<KV<String, Integer>> input) {
  return input
    .apply(Window.<KV<String, Integer>>into(FixedWindows.of(TWO_MINUTES))
        .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(TWO_MINUTES)))
        .withAllowedLateness(Duration.standardDays(1000))
        .accumulatingFiredPanes())
    .apply(Sum.integersPerKey())
    .apply(ParDo.of(new FormatAsStrings()));
}

代码示例来源:origin: takidau/streamingbook

@Override
public PCollection<String> expand(PCollection<KV<String, Integer>> input) {
  return input
    .apply(Window.<KV<String, Integer>>into(FixedWindows.of(TWO_MINUTES))
        .triggering(Repeatedly.forever(AfterProcessingTime
                       .pastFirstElementInPane()
                       .alignedTo(TWO_MINUTES, Utils.parseTime("12:05:00"))))
        .withAllowedLateness(Duration.standardDays(1000))
        .accumulatingFiredPanes())
    .apply(Sum.integersPerKey())
    .apply(ParDo.of(new FormatAsStrings()));
}

代码示例来源:origin: takidau/streamingbook

@Override
public PCollection<String> expand(PCollection<KV<String, Integer>> input) {
  return input
    .apply(Window.<KV<String, Integer>>into(FixedWindows.of(TWO_MINUTES))
        .triggering(AfterWatermark.pastEndOfWindow()
              .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(ONE_MINUTE))
              .withLateFirings(AfterPane.elementCountAtLeast(1)))
        .withAllowedLateness(TWO_MINUTES)
        .accumulatingFiredPanes())
    .apply(Sum.integersPerKey())
    .apply(ParDo.of(new FormatAsStrings()));
}

代码示例来源:origin: takidau/streamingbook

@Override
public PCollection<String> expand(PCollection<KV<String, Integer>> input) {
  return input
    .apply(Window.<KV<String, Integer>>into(FixedWindows.of(TWO_MINUTES))
        .triggering(AfterWatermark.pastEndOfWindow()
              .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(ONE_MINUTE))
              .withLateFirings(AfterPane.elementCountAtLeast(1)))
        .withAllowedLateness(Duration.standardDays(1000))
        .discardingFiredPanes())
    .apply(Sum.integersPerKey())
    .apply(ParDo.of(new FormatAsStrings()));
}

代码示例来源:origin: takidau/streamingbook

@Override
public PCollection<String> expand(PCollection<KV<String, Integer>> input) {
  return input
    .apply(Window.<KV<String, Integer>>into(Sessions.withGapDuration(ONE_MINUTE))
        .triggering(AfterWatermark.pastEndOfWindow()
              .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(ONE_MINUTE))
              .withLateFirings(AfterPane.elementCountAtLeast(1)))
        .withAllowedLateness(Duration.standardDays(1000))
        .accumulatingFiredPanes())
    .apply(Sum.integersPerKey())
    .apply(ParDo.of(new FormatAsStrings()));
}

代码示例来源:origin: takidau/streamingbook

@Override
  public PCollection<String> expand(PCollection<KV<String, Integer>> input) {
    return input
      .apply(Window.<KV<String, Integer>>into(FixedWindows.of(TWO_MINUTES))
          .triggering(AfterWatermark.pastEndOfWindow()
                .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(ONE_MINUTE))
                .withLateFirings(AfterPane.elementCountAtLeast(1)))
          .withAllowedLateness(Duration.standardDays(1000))
          .accumulatingFiredPanes())
      .apply(Sum.integersPerKey())
      .apply(ParDo.of(new FormatAsStrings()));
  }
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-extensions-sql

@Test
public void testUnsupportedGlobalWindowWithDefaultTrigger() {
 exceptions.expect(UnsupportedOperationException.class);
 pipeline.enableAbandonedNodeEnforcement(false);
 PCollection<Row> input =
   unboundedInput1.apply(
     "unboundedInput1.globalWindow",
     Window.<Row>into(new GlobalWindows()).triggering(DefaultTrigger.of()));
 String sql = "SELECT f_int2, COUNT(*) AS `size` FROM PCOLLECTION GROUP BY f_int2";
 input.apply("testUnsupportedGlobalWindows", SqlTransform.query(sql));
}

代码示例来源:origin: gojektech/feast

public void logNRows(PFeatureRows pFeatureRows, String name, int limit) {
 PCollection<FeatureRowExtended> main = pFeatureRows.getMain();
 PCollection<FeatureRowExtended> errors = pFeatureRows.getErrors();
 if (main.isBounded().equals(IsBounded.UNBOUNDED)) {
  Window<FeatureRowExtended> minuteWindow =
    Window.<FeatureRowExtended>into(FixedWindows.of(Duration.standardMinutes(1L)))
      .triggering(AfterWatermark.pastEndOfWindow())
      .discardingFiredPanes()
      .withAllowedLateness(Duration.standardMinutes(1));
  main = main.apply(minuteWindow);
  errors = errors.apply(minuteWindow);
 }
 main.apply("Sample success", Sample.any(limit))
   .apply("Log success sample", ParDo.of(new LoggerDoFn(Level.INFO, name + " MAIN ")));
 errors
   .apply("Sample errors", Sample.any(limit))
   .apply("Log errors sample", ParDo.of(new LoggerDoFn(Level.ERROR, name + " ERRORS ")));
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

@Override
 public PCollection<Iterable<ValueInSingleWindow<T>>> expand(PCollection<T> input) {
  WindowFn<?, ?> originalWindowFn = input.getWindowingStrategy().getWindowFn();

  return input
    .apply(Reify.windows())
    .apply(
      WithKeys.<Integer, ValueInSingleWindow<T>>of(0)
        .withKeyType(new TypeDescriptor<Integer>() {}))
    .apply(
      Window.into(
          new IdentityWindowFn<KV<Integer, ValueInSingleWindow<T>>>(
            originalWindowFn.windowCoder()))
        .triggering(Never.ever())
        .withAllowedLateness(input.getWindowingStrategy().getAllowedLateness())
        .discardingFiredPanes())
    // all values have the same key so they all appear as a single output element
    .apply(GroupByKey.create())
    .apply(Values.create())
    .setWindowingStrategyInternal(input.getWindowingStrategy());
 }
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

private void testOutputAfterCheckpoint(IsBounded bounded) {
 PCollection<Integer> outputs =
   p.apply(Create.of("foo"))
     .apply(ParDo.of(sdfWithMultipleOutputsPerBlock(bounded, 3)))
     .apply(Window.<Integer>configure().triggering(Never.ever()).discardingFiredPanes());
 PAssert.thatSingleton(outputs.apply(Count.globally()))
   .isEqualTo((long) SDFWithMultipleOutputsPerBlockBase.MAX_INDEX);
 p.run();
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

@Test
@Category(ValidatesRunner.class)
public void testHotKeyCombiningWithAccumulationMode() {
 PCollection<Integer> input = pipeline.apply(Create.of(1, 2, 3, 4, 5));
 PCollection<Integer> output =
   input
     .apply(
       Window.<Integer>into(new GlobalWindows())
         .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
         .accumulatingFiredPanes()
         .withAllowedLateness(new Duration(0), ClosingBehavior.FIRE_ALWAYS))
     .apply(Sum.integersGlobally().withoutDefaults().withFanout(2))
     .apply(ParDo.of(new GetLast()));
 PAssert.that(output)
   .satisfies(
     input1 -> {
      assertThat(input1, hasItem(15));
      return null;
     });
 pipeline.run();
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

@Test
public void testMissingLateness() {
 FixedWindows fixed10 = FixedWindows.of(Duration.standardMinutes(10));
 Repeatedly trigger = Repeatedly.forever(AfterPane.elementCountAtLeast(5));
 thrown.expect(IllegalArgumentException.class);
 thrown.expectMessage("requires that the allowed lateness");
 pipeline
   .apply(Create.of("hello", "world").withCoder(StringUtf8Coder.of()))
   .apply("Mode", Window.<String>configure().accumulatingFiredPanes())
   .apply("Window", Window.into(fixed10))
   .apply("Trigger", Window.<String>configure().triggering(trigger));
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

@Test
public void testMissingMode() {
 FixedWindows fixed10 = FixedWindows.of(Duration.standardMinutes(10));
 Repeatedly trigger = Repeatedly.forever(AfterPane.elementCountAtLeast(5));
 PCollection<String> input =
   pipeline
     .apply(Create.of("hello", "world").withCoder(StringUtf8Coder.of()))
     .apply("Window", Window.into(fixed10));
 thrown.expect(IllegalArgumentException.class);
 thrown.expectMessage("requires that the accumulation mode");
 input.apply(
   "Triggering",
   Window.<String>configure()
     .withAllowedLateness(Duration.standardDays(1))
     .triggering(trigger));
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

@Test
public void testWindowIntoTriggersAndAccumulating() {
 FixedWindows fixed10 = FixedWindows.of(Duration.standardMinutes(10));
 Repeatedly trigger = Repeatedly.forever(AfterPane.elementCountAtLeast(5));
 WindowingStrategy<?, ?> strategy =
   pipeline
     .apply(Create.of("hello", "world").withCoder(StringUtf8Coder.of()))
     .apply(
       Window.<String>into(fixed10)
         .triggering(trigger)
         .accumulatingFiredPanes()
         .withAllowedLateness(Duration.ZERO))
     .getWindowingStrategy();
 assertEquals(fixed10, strategy.getWindowFn());
 assertEquals(trigger, strategy.getTrigger());
 assertEquals(AccumulationMode.ACCUMULATING_FIRED_PANES, strategy.getMode());
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

@Test
public void testWindowIntoPropagatesLateness() {
 FixedWindows fixed10 = FixedWindows.of(Duration.standardMinutes(10));
 FixedWindows fixed25 = FixedWindows.of(Duration.standardMinutes(25));
 WindowingStrategy<?, ?> strategy =
   pipeline
     .apply(Create.of("hello", "world").withCoder(StringUtf8Coder.of()))
     .apply(
       "WindowInto10",
       Window.<String>into(fixed10)
         .withAllowedLateness(Duration.standardDays(1))
         .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(5)))
         .accumulatingFiredPanes())
     .apply("WindowInto25", Window.into(fixed25))
     .getWindowingStrategy();
 assertEquals(Duration.standardDays(1), strategy.getAllowedLateness());
 assertEquals(fixed25, strategy.getWindowFn());
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

@Test
public void testWindowPropagatesEachPart() {
 FixedWindows fixed10 = FixedWindows.of(Duration.standardMinutes(10));
 Repeatedly trigger = Repeatedly.forever(AfterPane.elementCountAtLeast(5));
 WindowingStrategy<?, ?> strategy =
   pipeline
     .apply(Create.of("hello", "world").withCoder(StringUtf8Coder.of()))
     .apply("Mode", Window.<String>configure().accumulatingFiredPanes())
     .apply(
       "Lateness",
       Window.<String>configure().withAllowedLateness(Duration.standardDays(1)))
     .apply("Trigger", Window.<String>configure().triggering(trigger))
     .apply("Window", Window.into(fixed10))
     .getWindowingStrategy();
 assertEquals(fixed10, strategy.getWindowFn());
 assertEquals(trigger, strategy.getTrigger());
 assertEquals(AccumulationMode.ACCUMULATING_FIRED_PANES, strategy.getMode());
 assertEquals(Duration.standardDays(1), strategy.getAllowedLateness());
}

相关文章