reactor.core.publisher.Flux.delaySequence()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(7.8k)|赞(0)|评价(0)|浏览(362)

本文整理了Java中reactor.core.publisher.Flux.delaySequence()方法的一些代码示例,展示了Flux.delaySequence()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.delaySequence()方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:delaySequence

Flux.delaySequence介绍

[英]Shift this Flux forward in time by a given Duration. Unlike with #delayElements(Duration), elements are shifted forward in time as they are emitted, always resulting in the delay between two elements being the same as in the source (only the first element is visibly delayed from the previous event, that is the subscription). Signals are delayed and continue on the Schedulers#parallel()Scheduler, but empty sequences or immediate error signals are not delayed.

With this operator, a source emitting at 10Hz with a delaySequence Durationof 1s will still emit at 10Hz, with an initial "hiccup" of 1s. On the other hand, #delayElements(Duration) would end up emitting at 1Hz.

This is closer to #delaySubscription(Duration), except the source is subscribed to immediately.
[中]将该通量在时间上向前移动给定的持续时间。与#delayElements(Duration)不同,元素在发出时会在时间上向前移动,始终导致两个元素之间的延迟与源中的延迟相同(只有第一个元素明显延迟于前一个事件,即订阅)。信号在调度程序#parallel()调度程序上延迟并继续,但空序列或即时错误信号不会延迟。
使用此运算符,以10Hz发射且延迟序列持续时间为1s的源仍将以10Hz发射,初始“hiccup”为1s。另一方面,延迟元件(持续时间)最终将以1Hz的频率发射。
这更接近于#delaySubscription(Duration),只是源被立即订阅。

代码示例

代码示例来源:origin: reactor/reactor-core

/**
 * Shift this {@link Flux} forward in time by a given {@link Duration}.
 * Unlike with {@link #delayElements(Duration)}, elements are shifted forward in time
 * as they are emitted, always resulting in the delay between two elements being
 * the same as in the source (only the first element is visibly delayed from the
 * previous event, that is the subscription).
 * Signals are delayed and continue on the {@link Schedulers#parallel() parallel}
 * {@link Scheduler}, but empty sequences or immediate error signals are not delayed.
 * <p>
 * With this operator, a source emitting at 10Hz with a delaySequence {@link Duration}
 * of 1s will still emit at 10Hz, with an initial "hiccup" of 1s.
 * On the other hand, {@link #delayElements(Duration)} would end up emitting
 * at 1Hz.
 * <p>
 * This is closer to {@link #delaySubscription(Duration)}, except the source
 * is subscribed to immediately.
 *
 * <p>
 * <img class="marble" src="doc-files/marbles/delaySequence.svg" alt="">
 *
 * @param delay {@link Duration} to shift the sequence by
 * @return an shifted {@link Flux} emitting at the same frequency as the source
 */
public final Flux<T> delaySequence(Duration delay) {
  return delaySequence(delay, Schedulers.parallel());
}

代码示例来源:origin: reactor/reactor-core

@Test
public void emptyCompletesImmediately() {
  Flux<Long> source = Flux.empty();
  Flux<Long> test = source
      .delaySequence(Duration.ofMillis(1000));
  Duration took = StepVerifier.create(test)
        .expectComplete()
        .verify();
  assertThat(took.toMillis())
      .as("completes immediately")
      .isLessThan(50);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void emptyErrorErrorsImmediately() {
  Flux<Long> source = Flux.error(new IllegalStateException("boom"));
  Flux<Long> test = source
      .delaySequence(Duration.ofMillis(1000));
  Duration took = StepVerifier.create(test)
                .expectSubscription()
                .expectErrorMessage("boom")
                .verify();
  assertThat(took.toMillis())
      .as("errors immediately")
      .isLessThan(50);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void delayFirstInterval() {
  Supplier<Flux<Tuple2<Long, Long>>> test = () -> Flux.interval(Duration.ofMillis(50))
                          .delaySequence(Duration.ofMillis(500))
                          .elapsed()
                            .take(33);
  StepVerifier.withVirtualTime(test)
        .thenAwait(Duration.ofMillis(500 + 50))
        .recordWith(ArrayList::new)
        .assertNext(t2 -> assertThat(t2.getT1()).isEqualTo(550))
        .thenAwait(Duration.ofMillis(33 * 50))
        .thenConsumeWhile(t2 -> t2.getT1() == 50)
        .consumeRecordedWith(record -> {
          assertThat(record.stream().mapToLong(Tuple2::getT2))
               .startsWith(0L, 1L, 2L)
               .endsWith(30L, 31L, 32L)
               .isSorted()
               .hasSize(33);
        })
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void every50msThenErrorDelaysError() {
  Supplier<Flux<Long>> test = () -> {
    Flux<Long> source = Flux.concat(Mono.delay(Duration.ofMillis(50))
                      .then(Mono.just(0L)),
        Mono.delay(Duration.ofMillis(50))
          .then(Mono.just(1L)),
        Mono.delay(Duration.ofMillis(50))
          .then(Mono.just(2L)),
        Mono.error(new IllegalStateException("boom")));
    return source.delaySequence(Duration.ofMillis(1000));
  };
  StepVerifier.withVirtualTime(test)
        .expectSubscription()
        .expectNoEvent(Duration.ofMillis(1050))
        .expectNext(0L)
        .expectNoEvent(Duration.ofMillis(50))
        .expectNext(1L)
        .expectNoEvent(Duration.ofMillis(50))
        .expectNext(2L)
        .verifyErrorMessage("boom");
}

代码示例来源:origin: reactor/reactor-core

.delaySequence(Duration.ofMillis(500))
.take(33)
.elapsed();

代码示例来源:origin: reactor/reactor-core

Future<Duration> ex1 = executorService.submit(() -> StepVerifier
    .withVirtualTime(() -> Flux.just("A", "B", "C")
                  .delaySequence(Duration.ofMillis(100))
                  .delaySequence(Duration.ofMillis(100))

代码示例来源:origin: reactor/reactor-core

@Test
public void onErrorAfterCompleteDrops() {
  TestPublisher<String> testPublisher = TestPublisher.createNoncompliant(
      TestPublisher.Violation.CLEANUP_ON_TERMINATE);
  StepVerifier.create(testPublisher
      .flux()
      .delaySequence(Duration.ofMillis(500)))
        .then(testPublisher::complete)
        .then(() -> testPublisher.error(new IllegalStateException("boom")))
        .expectComplete()
        .verifyThenAssertThat()
        .hasNotDroppedElements()
        .hasDroppedErrorWithMessage("boom");
}

代码示例来源:origin: reactor/reactor-core

@Test
public void onNextAfterErrorDrops() {
  TestPublisher<String> testPublisher = TestPublisher.createNoncompliant(
      TestPublisher.Violation.CLEANUP_ON_TERMINATE);
  StepVerifier.create(testPublisher
      .flux()
      .delaySequence(Duration.ofMillis(500)))
        .then(() -> testPublisher.error(new IllegalStateException("boom")))
        .then(() -> testPublisher.next("foo"))
        .expectErrorMessage("boom")
        .verifyThenAssertThat()
        .hasDropped("foo")
        .hasNotDroppedErrors();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void onCompleteAfterComplete() {
  TestPublisher<String> testPublisher = TestPublisher.createNoncompliant(
      TestPublisher.Violation.CLEANUP_ON_TERMINATE);
  StepVerifier.create(testPublisher
      .flux()
      .delaySequence(Duration.ofMillis(500)))
        .then(testPublisher::complete)
        .then(testPublisher::complete)
        .expectComplete()
        .verifyThenAssertThat()
        .hasNotDroppedElements()
        .hasNotDroppedErrors();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void onNextAfterCompleteDrops() {
  TestPublisher<String> testPublisher = TestPublisher.createNoncompliant(
      TestPublisher.Violation.CLEANUP_ON_TERMINATE);
  StepVerifier.create(testPublisher
      .flux()
      .delaySequence(Duration.ofMillis(500)))
        .then(testPublisher::complete)
        .then(() -> testPublisher.next("foo"))
        .expectComplete()
        .verifyThenAssertThat()
        .hasDropped("foo")
        .hasNotDroppedErrors();
}

代码示例来源:origin: io.projectreactor/reactor-core

/**
 * Shift this {@link Flux} forward in time by a given {@link Duration}.
 * Unlike with {@link #delayElements(Duration)}, elements are shifted forward in time
 * as they are emitted, always resulting in the delay between two elements being
 * the same as in the source (only the first element is visibly delayed from the
 * previous event, that is the subscription).
 * Signals are delayed and continue on the {@link Schedulers#parallel() parallel}
 * {@link Scheduler}, but empty sequences or immediate error signals are not delayed.
 * <p>
 * With this operator, a source emitting at 10Hz with a delaySequence {@link Duration}
 * of 1s will still emit at 10Hz, with an initial "hiccup" of 1s.
 * On the other hand, {@link #delayElements(Duration)} would end up emitting
 * at 1Hz.
 * <p>
 * This is closer to {@link #delaySubscription(Duration)}, except the source
 * is subscribed to immediately.
 *
 * <p>
 * <img class="marble" src="doc-files/marbles/delaySequence.svg" alt="">
 *
 * @param delay {@link Duration} to shift the sequence by
 * @return an shifted {@link Flux} emitting at the same frequency as the source
 */
public final Flux<T> delaySequence(Duration delay) {
  return delaySequence(delay, Schedulers.parallel());
}

相关文章

微信公众号

最新文章

更多

Flux类方法