reactor.core.scheduler.Schedulers.parallel()方法的使用及代码示例

x33g5p2x  于2022-01-30 转载在 其他  
字(6.8k)|赞(0)|评价(0)|浏览(342)

本文整理了Java中reactor.core.scheduler.Schedulers.parallel()方法的一些代码示例,展示了Schedulers.parallel()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Schedulers.parallel()方法的具体详情如下:
包路径:reactor.core.scheduler.Schedulers
类名称:Schedulers
方法名:parallel

Schedulers.parallel介绍

[英]Scheduler that hosts a fixed pool of single-threaded ExecutorService-based workers and is suited for parallel work.
[中]

代码示例

代码示例来源:origin: resilience4j/resilience4j

/**
 * Creates a RateLimiterOperator.
 *
 * @param <T>         the value type of the upstream and downstream
 * @param rateLimiter the Rate limiter
 * @return a RateLimiterOperator
 */
public static <T> RateLimiterOperator<T> of(RateLimiter rateLimiter) {
  return of(rateLimiter, Schedulers.parallel());
}

代码示例来源:origin: resilience4j/resilience4j

/**
 * Creates a BulkheadOperator.
 *
 * @param <T>      the value type of the upstream and downstream
 * @param bulkhead the Bulkhead
 * @return a BulkheadOperator
 */
public static <T> BulkheadOperator<T> of(Bulkhead bulkhead) {
  return of(bulkhead, Schedulers.parallel());
}

代码示例来源:origin: reactor/reactor-core

@Test
public void sourceCachedNoCoordinatorLeak() {
  TestPublisher<Integer> source = TestPublisher.create();
  MonoCacheTime<Integer> cached = new MonoCacheTime<>(source.mono(), Duration.ofSeconds(2),
      Schedulers.parallel());
  cached.subscribe();
  WeakReference<Signal<Integer>> refCoordinator = new WeakReference<>(cached.state);
  assertThat(refCoordinator.get()).isInstanceOf(MonoCacheTime.CoordinatorSubscriber.class);
  source.emit(100);
  System.gc();
  assertThat(refCoordinator.get()).isNull();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void scanOperator() {
  final Flux<Long> interval = Flux.interval(Duration.ofSeconds(1));
  assertThat(interval).isInstanceOf(Scannable.class);
  assertThat(((Scannable) interval).scan(Scannable.Attr.RUN_ON))
      .isSameAs(Schedulers.parallel());
}

代码示例来源:origin: reactor/reactor-core

@Test
public void advancedParallelParallelized() {
  Flux.range(1, 10)
    .parallel(2)
    .runOn(Schedulers.parallel())
    .subscribe(i -> System.out.println(Thread.currentThread().getName() + " -> " + i));
}

代码示例来源:origin: reactor/reactor-core

@Test
public void scanOperator() {
  final Flux<List<Integer>> flux = Flux.just(1).bufferTimeout(3, Duration.ofSeconds(1));
  assertThat(flux).isInstanceOf(Scannable.class);
  assertThat(((Scannable) flux).scan(Scannable.Attr.RUN_ON)).isSameAs(Schedulers.parallel());
}

代码示例来源:origin: reactor/reactor-core

@Test
public void testSubscribeOnValueFusion() {
  StepVerifier.create(Mono.just(1)
              .flatMapMany(f -> Mono.just(f + 1)
                         .subscribeOn(Schedulers.parallel())
                         .map(this::slow)))
        .expectFusion(Fuseable.ASYNC, Fuseable.NONE)
        .expectNext(2)
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void fromPublishersSequentialSubscribe() {
  List<Integer> values = Collections.synchronizedList(new ArrayList<>(10));
  ParallelFlux.from(Flux.range(1, 3), Flux.range(4, 3))
        .runOn(Schedulers.parallel())
        .doOnNext(values::add)
        .sequential()
        .blockLast();
  assertThat(values)
       .hasSize(6)
       .containsExactlyInAnyOrder(1, 2, 3, 4, 5, 6);
}

代码示例来源:origin: reactor/reactor-core

Flux<Integer> flatMapScenario() {
  return Flux.interval(Duration.ofSeconds(3))
        .flatMap(v -> Flux.fromIterable(Arrays.asList("A"))
             .flatMap(w -> Mono.fromCallable(() -> Arrays.asList(1, 2))
                      .subscribeOn(Schedulers.parallel())
                      .flatMapMany(Flux::fromIterable))).log();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void runOnZeroPrefetchRejected() {
  ParallelFlux<Integer> validSoFar = ParallelFlux.from(Mono.just(1));
  Assertions.assertThatExceptionOfType(IllegalArgumentException.class)
       .isThrownBy(() -> validSoFar.runOn(Schedulers.parallel(), 0))
       .withMessage("prefetch > 0 required but it was 0");
}

代码示例来源:origin: reactor/reactor-core

@Test
public void runOnNegativePrefetchRejected() {
  ParallelFlux<Integer> validSoFar = ParallelFlux.from(Mono.just(1));
  Assertions.assertThatExceptionOfType(IllegalArgumentException.class)
       .isThrownBy(() -> validSoFar.runOn(Schedulers.parallel(), -1))
       .withMessage("prefetch > 0 required but it was -1");
}

代码示例来源:origin: reactor/reactor-core

@Test
public void createStreamFromMonoCreate2() {
  StepVerifier.create(Mono.create(MonoSink::success)
              .publishOn(Schedulers.parallel()))
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void transformChangesPrefetch() {
  assertThat(ParallelFlux.from(Flux.range(1, 10), 3, 12, Queues.small())
              .transform(pf -> pf.runOn(Schedulers.parallel(), 3)
                       .log()
                       .hide())
              .getPrefetch())
      .isEqualTo(3);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void expectNextAsync() {
  Flux<String> flux = Flux.just("foo", "bar")
              .publishOn(Schedulers.parallel());
  StepVerifier.create(flux)
        .expectNext("foo")
        .expectNext("bar")
        .expectComplete()
        .verify();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void missingNextAsync() {
  Flux<String> flux = Flux.just("foo", "bar")
              .publishOn(Schedulers.parallel());
  assertThatExceptionOfType(AssertionError.class)
      .isThrownBy(() -> StepVerifier.create(flux)
        .expectNext("foo")
        .expectComplete()
        .verify())
      .withMessage("expectation \"expectComplete\" failed (expected: onComplete(); actual: onNext(bar))");
}

代码示例来源:origin: reactor/reactor-core

@Test
public void fluxCreateSerialized2(){
  StepVerifier.create(Flux.create(s -> {
    s.next("test1");
    s.next("test2");
    s.next("test3");
    s.complete();
  }).publishOn(Schedulers.parallel()))
        .expectNext("test1", "test2", "test3")
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void fluxCreateDrop2(){
  StepVerifier.create(Flux.create(s -> {
    s.next("test1");
    s.next("test2");
    s.next("test3");
    s.complete();
  }, FluxSink.OverflowStrategy.DROP).publishOn(Schedulers.parallel()))
        .expectNext("test1", "test2", "test3")
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void fluxCreateLatest2(){
  StepVerifier.create(Flux.create(s -> {
    s.next("test1");
    s.next("test2");
    s.next("test3");
    s.complete();
  }, FluxSink.OverflowStrategy.LATEST).publishOn(Schedulers.parallel()))
        .expectNext("test1", "test2", "test3")
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void fluxCreateIgnore2(){
  StepVerifier.create(Flux.create(s -> {
    s.next("test1");
    s.next("test2");
    s.next("test3");
    s.complete();
  }, FluxSink.OverflowStrategy.IGNORE).publishOn(Schedulers.parallel()))
        .expectNext("test1", "test2", "test3")
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void fluxCreateError2(){
  StepVerifier.create(Flux.create(s -> {
    s.next("test1");
    s.next("test2");
    s.next("test3");
    s.complete();
  }, FluxSink.OverflowStrategy.ERROR).publishOn(Schedulers.parallel()))
        .expectNext("test1", "test2", "test3")
        .verifyComplete();
}

相关文章