reactor.core.scheduler.Schedulers类的使用及代码示例

x33g5p2x  于2022-01-30 转载在 其他  
字(8.5k)|赞(0)|评价(0)|浏览(1371)

本文整理了Java中reactor.core.scheduler.Schedulers类的一些代码示例,展示了Schedulers类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Schedulers类的具体详情如下:
包路径:reactor.core.scheduler.Schedulers
类名称:Schedulers

Schedulers介绍

[英]Schedulers provides various Scheduler factories useable by reactor.core.publisher.Flux#publishOn or reactor.core.publisher.Mono#subscribeOn :

  • #fromExecutorService(ExecutorService)}.
  • #newParallel: Optimized for fast Runnable executions
  • #single : Optimized for low-latency Runnable executions
  • #immediate.

Factories prefixed with new return a new instance of their flavor of Scheduler, while other factories like #elastic() return a shared instance, that is the one used by operators requiring that flavor as their default Scheduler.
[中]调度器提供了可供反应堆使用的各种调度器工厂。果心出版商通量#出版或反应堆。果心出版商Mono#订阅:
*#来自ExecutorService(ExecutorService)}。
*#newParallel:针对快速可运行执行进行了优化
*#单一:针对低延迟可运行执行进行了优化
*#立即。
前缀为new的工厂返回其调度程序风格的新实例,而像#elastic()这样的其他工厂返回共享实例,即要求将该风格作为默认调度程序的操作员使用的实例。

代码示例

代码示例来源:origin: spring-projects/spring-security

@Override
  public Mono<Authentication> authenticate(Authentication token) {
    return Mono.just(token)
      .publishOn(Schedulers.elastic())
      .flatMap( t -> {
        try {
          return Mono.just(authenticationManager.authenticate(t));
        } catch(Throwable error) {
          return Mono.error(error);
        }
      })
      .filter( a -> a.isAuthenticated());
  }
}

代码示例来源:origin: line/armeria

ArmeriaServerHttpRequest(ServiceRequestContext ctx,
             HttpRequest req,
             DataBufferFactoryWrapper<?> factoryWrapper) {
  super(URI.create(requireNonNull(req, "req").path()),
     null,
     fromArmeriaHttpHeaders(req.headers()));
  this.ctx = requireNonNull(ctx, "ctx");
  this.req = req;
  body = Flux.from(req).cast(HttpData.class).map(factoryWrapper::toDataBuffer)
        // Guarantee that the context is accessible from a controller method
        // when a user specify @RequestBody in order to convert a request body into an object.
        .publishOn(Schedulers.fromExecutor(ctx.contextAwareExecutor()));
}

代码示例来源:origin: resilience4j/resilience4j

/**
 * Creates a RateLimiterOperator.
 *
 * @param <T>         the value type of the upstream and downstream
 * @param rateLimiter the Rate limiter
 * @return a RateLimiterOperator
 */
public static <T> RateLimiterOperator<T> of(RateLimiter rateLimiter) {
  return of(rateLimiter, Schedulers.parallel());
}

代码示例来源:origin: reactor/reactor-core

@BeforeClass
public static void loadEnv() {
  ioGroup = Schedulers.newElastic("work");
  asyncGroup = Schedulers.newParallel("parallel", 4);
}

代码示例来源:origin: reactor/reactor-core

@BeforeClass
public static void before() {
  scheduler = Schedulers.fromExecutorService(Executors.newSingleThreadExecutor());
  nonBlockingScheduler = Schedulers.newSingle("nonBlockingScheduler");
}

代码示例来源:origin: codecentric/spring-boot-admin

public void start() {
  this.subscription = Flux.interval(this.checkReminderInverval, Schedulers.newSingle("reminders"))
              .log(log.getName(), Level.FINEST)
              .doOnSubscribe(s -> log.debug("Started reminders"))
              .flatMap(i -> this.sendReminders())
              .onErrorContinue((ex, value) -> log.warn(
                "Unexpected error while sending reminders",
                ex
              ))
              .subscribe();
}

代码示例来源:origin: codecentric/spring-boot-admin

@Override
public void start() {
  super.start();
  intervalSubscription = Flux.interval(updateInterval)
                .doOnSubscribe(s -> log.debug("Scheduled status update every {}", updateInterval))
                .log(log.getName(), Level.FINEST)
                .subscribeOn(Schedulers.newSingle("status-monitor"))
                .concatMap(i -> this.updateStatusForAllInstances())
                .onErrorContinue((ex, value) -> log.warn("Unexpected error while updating statuses",
                  ex
                ))
                .subscribe();
}

代码示例来源:origin: reactor/reactor-core

Flux<Integer> flatMapScenario() {
  return Flux.interval(Duration.ofSeconds(3))
        .flatMap(v -> Flux.fromIterable(Arrays.asList("A"))
             .flatMap(w -> Mono.fromCallable(() -> Arrays.asList(1, 2))
                      .subscribeOn(Schedulers.parallel())
                      .flatMapMany(Flux::fromIterable))).log();
}

代码示例来源:origin: codecentric/spring-boot-admin

@Override
protected Publisher<Void> handle(Flux<InstanceEvent> publisher) {
  return publisher.subscribeOn(Schedulers.newSingle("info-updater"))
          .filter(event -> event instanceof InstanceEndpointsDetectedEvent ||
                   event instanceof InstanceStatusChangedEvent ||
                   event instanceof InstanceRegistrationUpdatedEvent)
          .flatMap(this::updateInfo);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void scanMain() {
  Flux<Integer> parent = Flux.just(1).map(i -> i);
  FluxReplay<Integer> test = new FluxReplay<>(parent, 25, 1000, Schedulers.single());
  Assertions.assertThat(test.scan(Scannable.Attr.PARENT)).isSameAs(parent);
  Assertions.assertThat(test.scan(Scannable.Attr.PREFETCH)).isEqualTo(25);
  Assertions.assertThat(test.scan(Scannable.Attr.RUN_ON)).isSameAs(Schedulers.single());
}

代码示例来源:origin: reactor/reactor-core

@Test
public void discardPollAsyncPredicateFail() {
  StepVerifier.create(Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) //range uses tryOnNext, so let's use just instead
              .publishOn(Schedulers.newSingle("discardPollAsync"), 1)
              .filter(i -> { throw new IllegalStateException("boom"); })
  )
        .expectFusion(Fuseable.ASYNC)
        .expectErrorMessage("boom")
        .verifyThenAssertThat()
        .hasDiscarded(1); //publishOn also might discard the rest
}

代码示例来源:origin: reactor/reactor-core

@Test
public void testAsynchronousRun() {
  Flux.range(1, 2).flatMapSequential(t -> Flux.range(1, 1000)
                        .subscribeOn(Schedulers.single())
  ).publishOn(Schedulers.elastic()).subscribe(ts);
  ts.await(Duration.ofSeconds(5));
  ts.assertNoError();
  ts.assertValueCount(2000);
}

代码示例来源:origin: line/armeria

private Mono<Void> write(Flux<? extends DataBuffer> publisher) {
  return Mono.defer(() -> {
    final HttpResponse response = HttpResponse.of(
        Flux.concat(Mono.just(headers), publisher.map(factoryWrapper::toHttpData))
          // Publish the response stream on the event loop in order to avoid the possibility of
          // calling subscription.request() from multiple threads while publishing messages
          // with onNext signals or starting the subscription with onSubscribe signal.
          .publishOn(Schedulers.fromExecutor(ctx.eventLoop())));
    future.complete(response);
    return Mono.fromFuture(response.completionFuture());
  });
}

代码示例来源:origin: reactor/reactor-core

@Test
public void subscribeWithAsyncFusion() {
  Processor<Integer, Integer> processor = EmitterProcessor.create(16);
  StepVerifier.create(processor)
        .then(() -> Flux.range(1, 5).publishOn(Schedulers.elastic()).subscribe(processor))
        .expectNext(1, 2, 3, 4, 5)
        .expectComplete()
        .verify(Duration.ofSeconds(1));
}

代码示例来源:origin: reactor/reactor-core

@Test
public void timeoutDropWhenNoCancelWithoutFallback() {
  for (int i = 0; i < 50; i++) {
    StepVerifier.withVirtualTime(
        () -> Flux.just("cat")
             .delaySubscription(Duration.ofMillis(3))
             // We cancel on another scheduler that won't do anything to force it to act like
             // the event is already in flight
             .cancelOn(Schedulers.fromExecutor(r -> {}))
             .timeout(Duration.ofMillis(2))
    )
          .thenAwait(Duration.ofSeconds(5))
          .expectError(TimeoutException.class)
          .verify();
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
public void discardOnTimerRejected() {
  Scheduler scheduler = Schedulers.newSingle("discardOnTimerRejected");
  StepVerifier.create(Flux.just(1, 2, 3)
              .doOnNext(n -> scheduler.dispose())
              .bufferTimeout(10, Duration.ofMillis(100), scheduler))
        .expectErrorSatisfies(e -> assertThat(e).isInstanceOf(RejectedExecutionException.class))
        .verifyThenAssertThat()
        .hasDiscardedExactly(1);
}

代码示例来源:origin: codecentric/spring-boot-admin

@Override
protected Publisher<Void> handle(Flux<InstanceEvent> publisher) {
  return publisher.subscribeOn(Schedulers.newSingle("notifications")).flatMap(this::sendNotifications);
}

代码示例来源:origin: resilience4j/resilience4j

@Test
public void shouldEmitErrorWithCircuitBreakerOpenExceptionEvenWhenErrorDuringSubscribe() {
  circuitBreaker.transitionToOpenState();
  StepVerifier.create(
      Flux.error(new IOException("BAM!"))
          .transform(CircuitBreakerOperator.of(circuitBreaker))
          .transform(BulkheadOperator.of(bulkhead, Schedulers.immediate()))
          .transform(RateLimiterOperator.of(rateLimiter, Schedulers.immediate()))
  ).expectError(CircuitBreakerOpenException.class)
      .verify(Duration.ofSeconds(1));
}

代码示例来源:origin: reactor/reactor-core

@Test
public void scanOperator() {
  final Flux<Integer> flux = Flux.just(1).cancelOn(Schedulers.elastic());
  assertThat(flux).isInstanceOf(Scannable.class);
  assertThat(((Scannable) flux).scan(Scannable.Attr.RUN_ON)).isSameAs(Schedulers.elastic());
}

代码示例来源:origin: reactor/reactor-core

@Test
public void missingNextAsync() {
  Flux<String> flux = Flux.just("foo", "bar")
              .publishOn(Schedulers.parallel());
  assertThatExceptionOfType(AssertionError.class)
      .isThrownBy(() -> StepVerifier.create(flux)
        .expectNext("foo")
        .expectComplete()
        .verify())
      .withMessage("expectation \"expectComplete\" failed (expected: onComplete(); actual: onNext(bar))");
}

相关文章