本文整理了Java中reactor.core.scheduler.Schedulers
类的一些代码示例,展示了Schedulers
类的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Schedulers
类的具体详情如下:
包路径:reactor.core.scheduler.Schedulers
类名称:Schedulers
[英]Schedulers provides various Scheduler factories useable by reactor.core.publisher.Flux#publishOn or reactor.core.publisher.Mono#subscribeOn :
Factories prefixed with new return a new instance of their flavor of Scheduler, while other factories like #elastic() return a shared instance, that is the one used by operators requiring that flavor as their default Scheduler.
[中]调度器提供了可供反应堆使用的各种调度器工厂。果心出版商通量#出版或反应堆。果心出版商Mono#订阅:
*#来自ExecutorService(ExecutorService)}。
*#newParallel:针对快速可运行执行进行了优化
*#单一:针对低延迟可运行执行进行了优化
*#立即。
前缀为new的工厂返回其调度程序风格的新实例,而像#elastic()这样的其他工厂返回共享实例,即要求将该风格作为默认调度程序的操作员使用的实例。
代码示例来源:origin: spring-projects/spring-security
@Override
public Mono<Authentication> authenticate(Authentication token) {
return Mono.just(token)
.publishOn(Schedulers.elastic())
.flatMap( t -> {
try {
return Mono.just(authenticationManager.authenticate(t));
} catch(Throwable error) {
return Mono.error(error);
}
})
.filter( a -> a.isAuthenticated());
}
}
代码示例来源:origin: line/armeria
ArmeriaServerHttpRequest(ServiceRequestContext ctx,
HttpRequest req,
DataBufferFactoryWrapper<?> factoryWrapper) {
super(URI.create(requireNonNull(req, "req").path()),
null,
fromArmeriaHttpHeaders(req.headers()));
this.ctx = requireNonNull(ctx, "ctx");
this.req = req;
body = Flux.from(req).cast(HttpData.class).map(factoryWrapper::toDataBuffer)
// Guarantee that the context is accessible from a controller method
// when a user specify @RequestBody in order to convert a request body into an object.
.publishOn(Schedulers.fromExecutor(ctx.contextAwareExecutor()));
}
代码示例来源:origin: resilience4j/resilience4j
/**
* Creates a RateLimiterOperator.
*
* @param <T> the value type of the upstream and downstream
* @param rateLimiter the Rate limiter
* @return a RateLimiterOperator
*/
public static <T> RateLimiterOperator<T> of(RateLimiter rateLimiter) {
return of(rateLimiter, Schedulers.parallel());
}
代码示例来源:origin: reactor/reactor-core
@BeforeClass
public static void loadEnv() {
ioGroup = Schedulers.newElastic("work");
asyncGroup = Schedulers.newParallel("parallel", 4);
}
代码示例来源:origin: reactor/reactor-core
@BeforeClass
public static void before() {
scheduler = Schedulers.fromExecutorService(Executors.newSingleThreadExecutor());
nonBlockingScheduler = Schedulers.newSingle("nonBlockingScheduler");
}
代码示例来源:origin: codecentric/spring-boot-admin
public void start() {
this.subscription = Flux.interval(this.checkReminderInverval, Schedulers.newSingle("reminders"))
.log(log.getName(), Level.FINEST)
.doOnSubscribe(s -> log.debug("Started reminders"))
.flatMap(i -> this.sendReminders())
.onErrorContinue((ex, value) -> log.warn(
"Unexpected error while sending reminders",
ex
))
.subscribe();
}
代码示例来源:origin: codecentric/spring-boot-admin
@Override
public void start() {
super.start();
intervalSubscription = Flux.interval(updateInterval)
.doOnSubscribe(s -> log.debug("Scheduled status update every {}", updateInterval))
.log(log.getName(), Level.FINEST)
.subscribeOn(Schedulers.newSingle("status-monitor"))
.concatMap(i -> this.updateStatusForAllInstances())
.onErrorContinue((ex, value) -> log.warn("Unexpected error while updating statuses",
ex
))
.subscribe();
}
代码示例来源:origin: reactor/reactor-core
Flux<Integer> flatMapScenario() {
return Flux.interval(Duration.ofSeconds(3))
.flatMap(v -> Flux.fromIterable(Arrays.asList("A"))
.flatMap(w -> Mono.fromCallable(() -> Arrays.asList(1, 2))
.subscribeOn(Schedulers.parallel())
.flatMapMany(Flux::fromIterable))).log();
}
代码示例来源:origin: codecentric/spring-boot-admin
@Override
protected Publisher<Void> handle(Flux<InstanceEvent> publisher) {
return publisher.subscribeOn(Schedulers.newSingle("info-updater"))
.filter(event -> event instanceof InstanceEndpointsDetectedEvent ||
event instanceof InstanceStatusChangedEvent ||
event instanceof InstanceRegistrationUpdatedEvent)
.flatMap(this::updateInfo);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void scanMain() {
Flux<Integer> parent = Flux.just(1).map(i -> i);
FluxReplay<Integer> test = new FluxReplay<>(parent, 25, 1000, Schedulers.single());
Assertions.assertThat(test.scan(Scannable.Attr.PARENT)).isSameAs(parent);
Assertions.assertThat(test.scan(Scannable.Attr.PREFETCH)).isEqualTo(25);
Assertions.assertThat(test.scan(Scannable.Attr.RUN_ON)).isSameAs(Schedulers.single());
}
代码示例来源:origin: reactor/reactor-core
@Test
public void discardPollAsyncPredicateFail() {
StepVerifier.create(Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) //range uses tryOnNext, so let's use just instead
.publishOn(Schedulers.newSingle("discardPollAsync"), 1)
.filter(i -> { throw new IllegalStateException("boom"); })
)
.expectFusion(Fuseable.ASYNC)
.expectErrorMessage("boom")
.verifyThenAssertThat()
.hasDiscarded(1); //publishOn also might discard the rest
}
代码示例来源:origin: reactor/reactor-core
@Test
public void testAsynchronousRun() {
Flux.range(1, 2).flatMapSequential(t -> Flux.range(1, 1000)
.subscribeOn(Schedulers.single())
).publishOn(Schedulers.elastic()).subscribe(ts);
ts.await(Duration.ofSeconds(5));
ts.assertNoError();
ts.assertValueCount(2000);
}
代码示例来源:origin: line/armeria
private Mono<Void> write(Flux<? extends DataBuffer> publisher) {
return Mono.defer(() -> {
final HttpResponse response = HttpResponse.of(
Flux.concat(Mono.just(headers), publisher.map(factoryWrapper::toHttpData))
// Publish the response stream on the event loop in order to avoid the possibility of
// calling subscription.request() from multiple threads while publishing messages
// with onNext signals or starting the subscription with onSubscribe signal.
.publishOn(Schedulers.fromExecutor(ctx.eventLoop())));
future.complete(response);
return Mono.fromFuture(response.completionFuture());
});
}
代码示例来源:origin: reactor/reactor-core
@Test
public void subscribeWithAsyncFusion() {
Processor<Integer, Integer> processor = EmitterProcessor.create(16);
StepVerifier.create(processor)
.then(() -> Flux.range(1, 5).publishOn(Schedulers.elastic()).subscribe(processor))
.expectNext(1, 2, 3, 4, 5)
.expectComplete()
.verify(Duration.ofSeconds(1));
}
代码示例来源:origin: reactor/reactor-core
@Test
public void timeoutDropWhenNoCancelWithoutFallback() {
for (int i = 0; i < 50; i++) {
StepVerifier.withVirtualTime(
() -> Flux.just("cat")
.delaySubscription(Duration.ofMillis(3))
// We cancel on another scheduler that won't do anything to force it to act like
// the event is already in flight
.cancelOn(Schedulers.fromExecutor(r -> {}))
.timeout(Duration.ofMillis(2))
)
.thenAwait(Duration.ofSeconds(5))
.expectError(TimeoutException.class)
.verify();
}
}
代码示例来源:origin: reactor/reactor-core
@Test
public void discardOnTimerRejected() {
Scheduler scheduler = Schedulers.newSingle("discardOnTimerRejected");
StepVerifier.create(Flux.just(1, 2, 3)
.doOnNext(n -> scheduler.dispose())
.bufferTimeout(10, Duration.ofMillis(100), scheduler))
.expectErrorSatisfies(e -> assertThat(e).isInstanceOf(RejectedExecutionException.class))
.verifyThenAssertThat()
.hasDiscardedExactly(1);
}
代码示例来源:origin: codecentric/spring-boot-admin
@Override
protected Publisher<Void> handle(Flux<InstanceEvent> publisher) {
return publisher.subscribeOn(Schedulers.newSingle("notifications")).flatMap(this::sendNotifications);
}
代码示例来源:origin: resilience4j/resilience4j
@Test
public void shouldEmitErrorWithCircuitBreakerOpenExceptionEvenWhenErrorDuringSubscribe() {
circuitBreaker.transitionToOpenState();
StepVerifier.create(
Flux.error(new IOException("BAM!"))
.transform(CircuitBreakerOperator.of(circuitBreaker))
.transform(BulkheadOperator.of(bulkhead, Schedulers.immediate()))
.transform(RateLimiterOperator.of(rateLimiter, Schedulers.immediate()))
).expectError(CircuitBreakerOpenException.class)
.verify(Duration.ofSeconds(1));
}
代码示例来源:origin: reactor/reactor-core
@Test
public void scanOperator() {
final Flux<Integer> flux = Flux.just(1).cancelOn(Schedulers.elastic());
assertThat(flux).isInstanceOf(Scannable.class);
assertThat(((Scannable) flux).scan(Scannable.Attr.RUN_ON)).isSameAs(Schedulers.elastic());
}
代码示例来源:origin: reactor/reactor-core
@Test
public void missingNextAsync() {
Flux<String> flux = Flux.just("foo", "bar")
.publishOn(Schedulers.parallel());
assertThatExceptionOfType(AssertionError.class)
.isThrownBy(() -> StepVerifier.create(flux)
.expectNext("foo")
.expectComplete()
.verify())
.withMessage("expectation \"expectComplete\" failed (expected: onComplete(); actual: onNext(bar))");
}
内容来源于网络,如有侵权,请联系作者删除!