reactor.core.publisher.Flux.sampleTimeout()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(4.8k)|赞(0)|评价(0)|浏览(224)

本文整理了Java中reactor.core.publisher.Flux.sampleTimeout()方法的一些代码示例,展示了Flux.sampleTimeout()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.sampleTimeout()方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:sampleTimeout

Flux.sampleTimeout介绍

[英]Emit the latest value from this Flux only if there were no new values emitted during the window defined by a companion Publisher derived from that particular value.

Note that this means that the last value in the sequence is always emitted.
[中]仅当在由派生自该特定值的伴随发布者定义的窗口期间没有新值发出时,才从该流量发出最新值。
请注意,这意味着始终发出序列中的最后一个值。

代码示例

代码示例来源:origin: reactor/reactor-core

/**
 * Emit the latest value from this {@link Flux} only if there were no new values emitted
 * during the window defined by a companion {@link Publisher} derived from that particular
 * value.
 * <p>
 * Note that this means that the last value in the sequence is always emitted.
 *
 * <p>
 * <img class="marble" src="doc-files/marbles/sampleTimeoutWithThrottlerFactory.svg" alt="">
 *
 * @reactor.discard This operator discards elements that are not part of the sampling.
 *
 * @param throttlerFactory supply a companion sampler {@link Publisher} which signals
 * the end of the window during which no new emission should occur. If it is the case,
 * the original value triggering the window is emitted.
 * @param <U> the companion reified type
 *
 * @return a {@link Flux} sampled to items not followed by any other item within a window
 * defined by a companion {@link Publisher}
 */
public final <U> Flux<T> sampleTimeout(Function<? super T, ? extends Publisher<U>> throttlerFactory) {
  return sampleTimeout(throttlerFactory, Queues.XS_BUFFER_SIZE);
}

代码示例来源:origin: reactor/reactor-core

Flux<Integer> scenario_sampleTimeoutTime2(){
  return Flux.range(1, 10)
        .delayElements(Duration.ofMillis(300))
        .sampleTimeout(d -> Mono.delay(Duration.ofMillis(100*d)), Integer.MAX_VALUE);
}

代码示例来源:origin: reactor/reactor-core

Flux<Integer> scenario_sampleTimeoutTime(){
  return Flux.range(1, 10)
        .delayElements(Duration.ofMillis(300))
        .sampleTimeout(d -> Mono.delay(Duration.ofMillis(100*d)), 1);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void sourceErrorsBeforeSamplingNoEmission() {
  Flux<Integer> source = Flux.just(1, 2).concatWith(Mono.error(new IllegalStateException("boom")));
  Duration duration = StepVerifier.create(source
      .sampleTimeout(i -> Mono.delay(Duration.ofMillis(250))))
                  .verifyErrorMessage("boom");
  //sanity check on the sequence duration
  assertThat(duration.toMillis()).isLessThan(250);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void sourceTerminatesBeforeSamplingEmitsLast() {
  Flux<Integer> source = Flux.just(1, 2).hide();
  Duration duration = StepVerifier.create(source
      .sampleTimeout(i -> Mono.delay(Duration.ofMillis(250))))
                  .expectNext(2)
                  .verifyComplete();
  //sanity check on the sequence duration
  assertThat(duration.toMillis()).isLessThan(250);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void sampleIncludesLastItem() {
  StepVerifier.withVirtualTime(() ->
      Flux.concat(
          Flux.range(1, 5),
          Mono.delay(Duration.ofMillis(260)).ignoreElement().map(Long::intValue),
          Flux.just(80, 90, 100)
      ).hide()
          .sampleTimeout(i -> Mono.delay(Duration.ofMillis(250))))
      .thenAwait(Duration.ofMillis(500))
      .expectNext(5)
      .expectNext(100)
      .verifyComplete();
}

代码示例来源:origin: com.aol.cyclops/cyclops-reactor

/**
 * @param throttlerFactory
 * @return
 * @see reactor.core.publisher.Flux#sampleTimeout(java.util.function.Function)
 */
public final <U> Flux<T> sampleTimeout(Function<? super T, ? extends Publisher<U>> throttlerFactory) {
  return boxed.sampleTimeout(throttlerFactory);
}
/**

代码示例来源:origin: com.aol.cyclops/cyclops-reactor

/**
 * @param throttlerFactory
 * @param maxConcurrency
 * @return
 * @see reactor.core.publisher.Flux#sampleTimeout(java.util.function.Function, int)
 */
public final <U> Flux<T> sampleTimeout(Function<? super T, ? extends Publisher<U>> throttlerFactory,
    int maxConcurrency) {
  return boxed.sampleTimeout(throttlerFactory, maxConcurrency);
}
/**

代码示例来源:origin: io.projectreactor/reactor-core

/**
 * Emit the latest value from this {@link Flux} only if there were no new values emitted
 * during the window defined by a companion {@link Publisher} derived from that particular
 * value.
 * <p>
 * Note that this means that the last value in the sequence is always emitted.
 *
 * <p>
 * <img class="marble" src="doc-files/marbles/sampleTimeoutWithThrottlerFactory.svg" alt="">
 *
 * @reactor.discard This operator discards elements that are not part of the sampling.
 *
 * @param throttlerFactory supply a companion sampler {@link Publisher} which signals
 * the end of the window during which no new emission should occur. If it is the case,
 * the original value triggering the window is emitted.
 * @param <U> the companion reified type
 *
 * @return a {@link Flux} sampled to items not followed by any other item within a window
 * defined by a companion {@link Publisher}
 */
public final <U> Flux<T> sampleTimeout(Function<? super T, ? extends Publisher<U>> throttlerFactory) {
  return sampleTimeout(throttlerFactory, Queues.XS_BUFFER_SIZE);
}

相关文章

微信公众号

最新文章

更多

Flux类方法