本文整理了Java中reactor.core.publisher.Flux.interval()
方法的一些代码示例,展示了Flux.interval()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.interval()
方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:interval
[英]Create a Flux that emits long values starting with 0 and incrementing at specified time intervals on the global timer. If demand is not produced in time, an onError will be signalled with an Exceptions#isOverflow(Throwable) IllegalStateException detailing the tick that couldn't be emitted. In normal conditions, the Flux will never complete.
Runs on the Schedulers#parallel() Scheduler.
[中]创建一个通量,该通量发射从0开始的长值,并在全局计时器上以指定的时间间隔递增。如果未及时生成需求,则会向onError发出异常信号#isOverflow(Throwable)IllegalStateException,详细说明无法发出的勾号。在正常情况下,通量永远不会完成。
在调度程序#parallel()调度程序上运行。
代码示例来源:origin: spring-projects/spring-framework
@GetMapping(path = "/spr16869", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
Flux<String> sseFlux() {
return Flux.interval(Duration.ofSeconds(1)).take(3)
.map(aLong -> String.format("event%d", aLong));
}
}
代码示例来源:origin: spring-projects/spring-framework
@GetMapping(produces = "text/event-stream")
Flux<Person> getPersonStream() {
return Flux.interval(ofMillis(100)).take(50).onBackpressureBuffer(50)
.map(index -> new Person("N" + index));
}
代码示例来源:origin: codecentric/spring-boot-admin
public void start() {
this.subscription = Flux.interval(this.checkReminderInverval, Schedulers.newSingle("reminders"))
.log(log.getName(), Level.FINEST)
.doOnSubscribe(s -> log.debug("Started reminders"))
.flatMap(i -> this.sendReminders())
.onErrorContinue((ex, value) -> log.warn(
"Unexpected error while sending reminders",
ex
))
.subscribe();
}
代码示例来源:origin: reactor/reactor-core
private Object[] sources01() {
return new Object[] {
new Object[] { Flux.interval(Duration.ofMillis(100)).map(String::valueOf) },
new Object[] { Flux.range(0, 2).map(String::valueOf) }
};
}
代码示例来源:origin: spring-projects/spring-framework
/**
* Return an interval stream of N number of ticks and buffer the emissions
* to avoid back pressure failures (e.g. on slow CI server).
*
* <p>Use this method as follows:
* <ul>
* <li>Tests that verify N number of items followed by verifyOnComplete()
* should set the number of emissions to N.
* <li>Tests that verify N number of items followed by thenCancel() should
* set the number of buffered to an arbitrary number greater than N.
* </ul>
*/
public static Flux<Long> testInterval(Duration period, int count) {
return Flux.interval(period).take(count).onBackpressureBuffer(count);
}
代码示例来源:origin: codecentric/spring-boot-admin
@Override
public void start() {
super.start();
intervalSubscription = Flux.interval(updateInterval)
.doOnSubscribe(s -> log.debug("Scheduled status update every {}", updateInterval))
.log(log.getName(), Level.FINEST)
.subscribeOn(Schedulers.newSingle("status-monitor"))
.concatMap(i -> this.updateStatusForAllInstances())
.onErrorContinue((ex, value) -> log.warn("Unexpected error while updating statuses",
ex
))
.subscribe();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void scanOperator() {
final Flux<Long> interval = Flux.interval(Duration.ofSeconds(1));
assertThat(interval).isInstanceOf(Scannable.class);
assertThat(((Scannable) interval).scan(Scannable.Attr.RUN_ON))
.isSameAs(Schedulers.parallel());
}
代码示例来源:origin: reactor/reactor-core
@Test
public void discardOnOpenError() {
StepVerifier.withVirtualTime(() -> Flux.interval(Duration.ZERO, Duration.ofMillis(100)) // 0, 1, 2
.map(Long::intValue)
.take(3)
.bufferWhen(Flux.interval(Duration.ZERO, Duration.ofMillis(100)),
u -> (u == 2) ? null : Mono.never()))
.thenAwait(Duration.ofSeconds(2))
.expectErrorMessage("The bufferClose returned a null Publisher")
.verifyThenAssertThat()
.hasDiscardedExactly(0, 1, 1);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void tickOverflow() {
StepVerifier.withVirtualTime(() ->
Flux.interval(Duration.ofMillis(50))
.delayUntil(i -> Mono.delay(Duration.ofMillis(250))))
.thenAwait(Duration.ofMinutes(1))
.expectNextCount(6)
.verifyErrorMessage("Could not emit tick 32 due to lack of requests (interval doesn't support small downstream requests that replenish slower than the ticks)");
}
代码示例来源:origin: reactor/reactor-core
@Test
public void virtualTimeSchedulerVeryLong() {
StepVerifier.withVirtualTime(() -> Flux.interval(Duration.ofMillis(1))
.map(tick -> new Date())
.take(100000)
.collectList())
.thenAwait(Duration.ofHours(1000))
.consumeNextWith(list -> Assert.assertTrue(list.size() == 100000))
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
Flux<Integer> flatMapScenario() {
return Flux.interval(Duration.ofSeconds(3))
.flatMap(v -> Flux.fromIterable(Arrays.asList("A"))
.flatMap(w -> Mono.fromCallable(() -> Arrays.asList(1, 2))
.subscribeOn(Schedulers.parallel())
.flatMapMany(Flux::fromIterable))).log();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void verifyVirtualTimeNoEventInterval() {
StepVerifier.withVirtualTime(() -> Flux.interval(Duration.ofSeconds(3))
.take(2))
.expectSubscription()
.expectNoEvent(Duration.ofSeconds(3))
.expectNext(0L)
.expectNoEvent(Duration.ofSeconds(3))
.expectNext(1L)
.expectComplete()
.verify();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void verifyDurationTimeout() {
Flux<String> flux = Flux.interval(Duration.ofMillis(200))
.map(l -> "foo")
.take(2);
assertThatExceptionOfType(AssertionError.class)
.isThrownBy(() -> StepVerifier.create(flux)
.expectNext("foo")
.expectNext("foo")
.expectComplete()
.verify(Duration.ofMillis(300)))
.withMessageStartingWith("VerifySubscriber timed out on");
}
代码示例来源:origin: reactor/reactor-core
@Test
public void gh783_intervalFullyEmitted() {
StepVerifier.withVirtualTime(() -> Flux.just("foo").flatMap(message -> Flux.interval(Duration.ofMinutes(5)).take(12)))
.expectSubscription()
.expectNoEvent(Duration.ofMinutes(5))
.expectNext(0L)
.thenAwait(Duration.ofMinutes(25))
.expectNext(1L, 2L, 3L, 4L, 5L)
.thenAwait(Duration.ofMinutes(30))
.expectNext(6L, 7L, 8L, 9L, 10L, 11L)
.expectComplete()
.verify(Duration.ofMillis(500));
}
代码示例来源:origin: reactor/reactor-core
@Test(timeout = 3000)
public void verifyVirtualTimeOnNextIntervalManual() {
VirtualTimeScheduler vts = VirtualTimeScheduler.create();
StepVerifier.withVirtualTime(() -> Flux.interval(Duration.ofMillis(1000), vts)
.map(d -> "t" + d))
.then(() -> vts.advanceTimeBy(Duration.ofHours(1)))
.expectNextCount(3600)
.thenCancel()
.verify();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void discardOnCancel() {
Mono<List<Long>> test = Flux.interval(Duration.ofMillis(100))
.take(10)
.collectList();
StepVerifier.create(test)
.expectSubscription()
.expectNoEvent(Duration.ofMillis(210))
.thenCancel()
.verifyThenAssertThat()
.hasDiscardedExactly(0L, 1L);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void gh783_firstSmallAdvance() {
StepVerifier.withVirtualTime(() -> Flux.just("foo").flatMap(message -> Flux.interval(Duration.ofMinutes(5)).take(12)))
.expectSubscription()
.expectNoEvent(Duration.ofMinutes(3))
.thenAwait(Duration.ofHours(1))
.expectNextCount(12)
.expectComplete()
.verify(Duration.ofMillis(500));
}
代码示例来源:origin: reactor/reactor-core
@Test
public void publishOnFilter() throws Exception {
Flux<Long> flux = Flux.interval(Duration.ofMillis(2)).take(255)
.publishOn(scheduler)
.filter(t -> true)
.doOnNext(i -> onNext(i))
.doOnError(e -> onError(e));
verifyRejectedExecutionConsistency(flux, 5);
}
代码示例来源:origin: reactor/reactor-core
@Ignore("delayElements test for local comparison run")
@Test
public void delayElements() {
Flux<Tuple2<Long, Long>> test = Flux.interval(Duration.ofMillis(50))
.onBackpressureDrop()
.delayElements(Duration.ofMillis(500))
.take(33)
.elapsed()
.log();
StepVerifier.create(test)
.thenConsumeWhile(t2 -> t2.getT1() >= 500)
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void shouldBeAbleToScheduleIntervalsWithLowGranularity() {
StepVerifier.create(Flux.interval(Duration.ofNanos(1)))
.expectSubscription()
.expectNext(0L)
.expectNext(1L)
.expectNext(2L)
.thenCancel()
.verify();
}
}
内容来源于网络,如有侵权,请联系作者删除!