reactor.core.publisher.Flux.interval()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(7.8k)|赞(0)|评价(0)|浏览(1305)

本文整理了Java中reactor.core.publisher.Flux.interval()方法的一些代码示例,展示了Flux.interval()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.interval()方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:interval

Flux.interval介绍

[英]Create a Flux that emits long values starting with 0 and incrementing at specified time intervals on the global timer. If demand is not produced in time, an onError will be signalled with an Exceptions#isOverflow(Throwable) IllegalStateException detailing the tick that couldn't be emitted. In normal conditions, the Flux will never complete.

Runs on the Schedulers#parallel() Scheduler.
[中]创建一个通量,该通量发射从0开始的长值,并在全局计时器上以指定的时间间隔递增。如果未及时生成需求,则会向onError发出异常信号#isOverflow(Throwable)IllegalStateException,详细说明无法发出的勾号。在正常情况下,通量永远不会完成。
在调度程序#parallel()调度程序上运行。

代码示例

代码示例来源:origin: spring-projects/spring-framework

@GetMapping(path = "/spr16869", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
  Flux<String> sseFlux() {
    return Flux.interval(Duration.ofSeconds(1)).take(3)
        .map(aLong -> String.format("event%d", aLong));
  }
}

代码示例来源:origin: spring-projects/spring-framework

@GetMapping(produces = "text/event-stream")
Flux<Person> getPersonStream() {
  return Flux.interval(ofMillis(100)).take(50).onBackpressureBuffer(50)
      .map(index -> new Person("N" + index));
}

代码示例来源:origin: codecentric/spring-boot-admin

public void start() {
  this.subscription = Flux.interval(this.checkReminderInverval, Schedulers.newSingle("reminders"))
              .log(log.getName(), Level.FINEST)
              .doOnSubscribe(s -> log.debug("Started reminders"))
              .flatMap(i -> this.sendReminders())
              .onErrorContinue((ex, value) -> log.warn(
                "Unexpected error while sending reminders",
                ex
              ))
              .subscribe();
}

代码示例来源:origin: reactor/reactor-core

private Object[] sources01() {
  return new Object[] {
      new Object[] { Flux.interval(Duration.ofMillis(100)).map(String::valueOf) },
      new Object[] { Flux.range(0, 2).map(String::valueOf) }
  };
}

代码示例来源:origin: spring-projects/spring-framework

/**
 * Return an interval stream of N number of ticks and buffer the emissions
 * to avoid back pressure failures (e.g. on slow CI server).
 *
 * <p>Use this method as follows:
 * <ul>
 * <li>Tests that verify N number of items followed by verifyOnComplete()
 * should set the number of emissions to N.
 * <li>Tests that verify N number of items followed by thenCancel() should
 * set the number of buffered to an arbitrary number greater than N.
 * </ul>
 */
public static Flux<Long> testInterval(Duration period, int count) {
  return Flux.interval(period).take(count).onBackpressureBuffer(count);
}

代码示例来源:origin: codecentric/spring-boot-admin

@Override
public void start() {
  super.start();
  intervalSubscription = Flux.interval(updateInterval)
                .doOnSubscribe(s -> log.debug("Scheduled status update every {}", updateInterval))
                .log(log.getName(), Level.FINEST)
                .subscribeOn(Schedulers.newSingle("status-monitor"))
                .concatMap(i -> this.updateStatusForAllInstances())
                .onErrorContinue((ex, value) -> log.warn("Unexpected error while updating statuses",
                  ex
                ))
                .subscribe();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void scanOperator() {
  final Flux<Long> interval = Flux.interval(Duration.ofSeconds(1));
  assertThat(interval).isInstanceOf(Scannable.class);
  assertThat(((Scannable) interval).scan(Scannable.Attr.RUN_ON))
      .isSameAs(Schedulers.parallel());
}

代码示例来源:origin: reactor/reactor-core

@Test
public void discardOnOpenError() {
  StepVerifier.withVirtualTime(() -> Flux.interval(Duration.ZERO, Duration.ofMillis(100)) // 0, 1, 2
                      .map(Long::intValue)
                      .take(3)
                      .bufferWhen(Flux.interval(Duration.ZERO, Duration.ofMillis(100)),
                          u -> (u == 2) ? null : Mono.never()))
        .thenAwait(Duration.ofSeconds(2))
        .expectErrorMessage("The bufferClose returned a null Publisher")
        .verifyThenAssertThat()
        .hasDiscardedExactly(0, 1, 1);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void tickOverflow() {
  StepVerifier.withVirtualTime(() ->
      Flux.interval(Duration.ofMillis(50))
        .delayUntil(i -> Mono.delay(Duration.ofMillis(250))))
        .thenAwait(Duration.ofMinutes(1))
        .expectNextCount(6)
        .verifyErrorMessage("Could not emit tick 32 due to lack of requests (interval doesn't support small downstream requests that replenish slower than the ticks)");
}

代码示例来源:origin: reactor/reactor-core

@Test
public void virtualTimeSchedulerVeryLong() {
  StepVerifier.withVirtualTime(() -> Flux.interval(Duration.ofMillis(1))
                      .map(tick -> new Date())
                      .take(100000)
                      .collectList())
        .thenAwait(Duration.ofHours(1000))
        .consumeNextWith(list -> Assert.assertTrue(list.size() == 100000))
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

Flux<Integer> flatMapScenario() {
  return Flux.interval(Duration.ofSeconds(3))
        .flatMap(v -> Flux.fromIterable(Arrays.asList("A"))
             .flatMap(w -> Mono.fromCallable(() -> Arrays.asList(1, 2))
                      .subscribeOn(Schedulers.parallel())
                      .flatMapMany(Flux::fromIterable))).log();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void verifyVirtualTimeNoEventInterval() {
  StepVerifier.withVirtualTime(() -> Flux.interval(Duration.ofSeconds(3))
                      .take(2))
        .expectSubscription()
        .expectNoEvent(Duration.ofSeconds(3))
        .expectNext(0L)
        .expectNoEvent(Duration.ofSeconds(3))
        .expectNext(1L)
        .expectComplete()
        .verify();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void verifyDurationTimeout() {
  Flux<String> flux = Flux.interval(Duration.ofMillis(200))
              .map(l -> "foo")
              .take(2);
  assertThatExceptionOfType(AssertionError.class)
      .isThrownBy(() -> StepVerifier.create(flux)
        .expectNext("foo")
        .expectNext("foo")
        .expectComplete()
        .verify(Duration.ofMillis(300)))
      .withMessageStartingWith("VerifySubscriber timed out on");
}

代码示例来源:origin: reactor/reactor-core

@Test
public void gh783_intervalFullyEmitted() {
  StepVerifier.withVirtualTime(() -> Flux.just("foo").flatMap(message -> Flux.interval(Duration.ofMinutes(5)).take(12)))
        .expectSubscription()
        .expectNoEvent(Duration.ofMinutes(5))
        .expectNext(0L)
        .thenAwait(Duration.ofMinutes(25))
        .expectNext(1L, 2L, 3L, 4L, 5L)
        .thenAwait(Duration.ofMinutes(30))
        .expectNext(6L, 7L, 8L, 9L, 10L, 11L)
        .expectComplete()
        .verify(Duration.ofMillis(500));
}

代码示例来源:origin: reactor/reactor-core

@Test(timeout = 3000)
public void verifyVirtualTimeOnNextIntervalManual() {
  VirtualTimeScheduler vts = VirtualTimeScheduler.create();
  StepVerifier.withVirtualTime(() -> Flux.interval(Duration.ofMillis(1000), vts)
                      .map(d -> "t" + d))
        .then(() -> vts.advanceTimeBy(Duration.ofHours(1)))
        .expectNextCount(3600)
        .thenCancel()
        .verify();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void discardOnCancel() {
  Mono<List<Long>> test = Flux.interval(Duration.ofMillis(100))
                .take(10)
                .collectList();
  StepVerifier.create(test)
        .expectSubscription()
        .expectNoEvent(Duration.ofMillis(210))
        .thenCancel()
        .verifyThenAssertThat()
        .hasDiscardedExactly(0L, 1L);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void gh783_firstSmallAdvance() {
  StepVerifier.withVirtualTime(() -> Flux.just("foo").flatMap(message -> Flux.interval(Duration.ofMinutes(5)).take(12)))
        .expectSubscription()
        .expectNoEvent(Duration.ofMinutes(3))
        .thenAwait(Duration.ofHours(1))
        .expectNextCount(12)
        .expectComplete()
        .verify(Duration.ofMillis(500));
}

代码示例来源:origin: reactor/reactor-core

@Test
public void publishOnFilter() throws Exception {
  Flux<Long> flux = Flux.interval(Duration.ofMillis(2)).take(255)
             .publishOn(scheduler)
             .filter(t -> true)
             .doOnNext(i -> onNext(i))
             .doOnError(e -> onError(e));
  verifyRejectedExecutionConsistency(flux, 5);
}

代码示例来源:origin: reactor/reactor-core

@Ignore("delayElements test for local comparison run")
@Test
public void delayElements() {
  Flux<Tuple2<Long, Long>> test = Flux.interval(Duration.ofMillis(50))
                    .onBackpressureDrop()
                    .delayElements(Duration.ofMillis(500))
                    .take(33)
                    .elapsed()
                    .log();
  StepVerifier.create(test)
        .thenConsumeWhile(t2 -> t2.getT1() >= 500)
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
  public void shouldBeAbleToScheduleIntervalsWithLowGranularity() {
    StepVerifier.create(Flux.interval(Duration.ofNanos(1)))
          .expectSubscription()
          .expectNext(0L)
          .expectNext(1L)
          .expectNext(2L)
          .thenCancel()
          .verify();
  }
}

相关文章

微信公众号

最新文章

更多

Flux类方法