reactor.core.scheduler.Schedulers.fromExecutorService()方法的使用及代码示例

x33g5p2x  于2022-01-30 转载在 其他  
字(6.6k)|赞(0)|评价(0)|浏览(381)

本文整理了Java中reactor.core.scheduler.Schedulers.fromExecutorService()方法的一些代码示例,展示了Schedulers.fromExecutorService()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Schedulers.fromExecutorService()方法的具体详情如下:
包路径:reactor.core.scheduler.Schedulers
类名称:Schedulers
方法名:fromExecutorService

Schedulers.fromExecutorService介绍

[英]Create a Scheduler which uses a backing ExecutorService to schedule Runnables for async operators.
[中]创建一个调度器,该调度器使用backing ExecutorService为异步运算符调度可运行项。

代码示例

代码示例来源:origin: reactor/reactor-core

@BeforeClass
public static void before() {
  scheduler = Schedulers.fromExecutorService(Executors.newSingleThreadExecutor());
  nonBlockingScheduler = Schedulers.newSingle("nonBlockingScheduler");
}

代码示例来源:origin: reactor/reactor-core

@Test
public void error() {
  StepVerifier.create(Flux.error(new RuntimeException("forced failure"))
              .publishOn(Schedulers.fromExecutorService(exec)))
        .verifyErrorMessage("forced failure");
}

代码示例来源:origin: reactor/reactor-core

@Test
public void callableEvaluatedTheRightTime() {
  AtomicInteger count = new AtomicInteger();
  Mono<Integer> p = Mono.fromCallable(count::incrementAndGet).subscribeOn(Schedulers.fromExecutorService(ForkJoinPool.commonPool()));
  Assert.assertEquals(0, count.get());
  p.subscribeWith(AssertSubscriber.create()).await();
  Assert.assertEquals(1, count.get());
}

代码示例来源:origin: reactor/reactor-core

@Test
public void mappedsyncSourceWithNull2() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Flux.fromIterable(Arrays.asList(1, 2))
    .map(v -> v == 2 ? null : v)
    .publishOn(Schedulers.fromExecutorService(exec))
    .subscribe(ts);
  ts.await(Duration.ofSeconds(5));
  ts.assertValues(1)
   .assertError(NullPointerException.class)
   .assertNotComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void classicEmpty() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Mono.<Integer>empty().subscribeOn(Schedulers.fromExecutorService(ForkJoinPool.commonPool()))
             .subscribe(ts);
  ts.await(Duration.ofSeconds(5));
  ts.assertNoValues()
   .assertNoError()
   .assertComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void classicEmptyBackpressured() throws Exception {
  AssertSubscriber<Integer> ts = AssertSubscriber.create(0);
  Mono.<Integer>empty().subscribeOn(Schedulers.fromExecutorService(ForkJoinPool.commonPool()))
             .subscribe(ts);
  ts.await(Duration.ofSeconds(5));
  ts.assertNoValues()
   .assertNoError()
   .assertComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void classicJust() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Mono.just(1)
    .subscribeOn(Schedulers.fromExecutorService(ForkJoinPool.commonPool()))
    .subscribe(ts);
  ts.await(Duration.ofSeconds(5));
  ts.assertValues(1)
   .assertNoError()
   .assertComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void mappedsyncSourceWithNull() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Flux.just(1, 2)
    .map(v -> v == 2 ? null : v)
    .publishOn(Schedulers.fromExecutorService(exec))
    .subscribe(ts);
  ts.await(Duration.ofSeconds(5));
  ts.assertValues(1)
   .assertError(NullPointerException.class)
   .assertNotComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void mappedsyncSourceWithNullHidden() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Flux.just(1, 2)
    .hide()
    .map(v -> v == 2 ? null : v)
    .publishOn(Schedulers.fromExecutorService(exec))
    .subscribe(ts);
  ts.await(Duration.ofSeconds(5));
  ts.assertValues(1)
   .assertError(NullPointerException.class)
   .assertNotComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void classicJust() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Flux.just(1)
    .publishOn(Schedulers.fromExecutorService(exec))
    .subscribe(ts);
  ts.await(Duration.ofSeconds(5));
  ts.assertValues(1)
   .assertNoError()
   .assertComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void syncSourceWithNull() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Flux.just(1, null, 1)
    .publishOn(Schedulers.fromExecutorService(exec))
    .subscribe(ts);
  ts.await(Duration.ofSeconds(5));
  ts.assertValues(1)
   .assertError(NullPointerException.class)
   .assertNotComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void classic() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Mono.fromSupplier(() -> 1)
    .subscribeOn(Schedulers.fromExecutorService(ForkJoinPool.commonPool()))
    .subscribe(ts);
  ts.await(Duration.ofSeconds(5));
  ts.assertValueCount(1)
   .assertNoError()
   .assertComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void classicEmptyBackpressured() throws Exception {
  AssertSubscriber<Integer> ts = AssertSubscriber.create(0);
  Flux.<Integer>empty().subscribeOn(Schedulers.fromExecutorService(ForkJoinPool.commonPool())).subscribe(ts);
  ts.await(Duration.ofSeconds(5));
  ts.assertNoValues()
  .assertNoError()
  .assertComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void classic() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Flux.range(1, 1000).subscribeOn(Schedulers.fromExecutorService(ForkJoinPool.commonPool())).subscribe(ts);
  ts.await(Duration.ofSeconds(5));
  ts.assertValueCount(1000)
  .assertNoError()
  .assertComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void classicEmpty() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Flux.<Integer>empty().subscribeOn(Schedulers.fromExecutorService(ForkJoinPool.commonPool())).subscribe(ts);
  ts.await(Duration.ofSeconds(5));
  ts.assertNoValues()
  .assertNoError()
  .assertComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void crossRange() {
  int count = 1000000;
  StepVerifier.create(Flux.range(1, count)
              .flatMap(v -> Flux.range(v, 2), false, 128, 1)
              .publishOn(Schedulers.fromExecutorService(exec)))
        .expectNextCount(2 * count)
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void crossRangeMax() {
  int count = 1000000;
  StepVerifier.create(Flux.range(1, count)
              .flatMap(v -> Flux.range(v, 2), false, 128, 32)
              .publishOn(Schedulers.fromExecutorService(exec)))
        .expectNextCount(2 * count)
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void crossRangeMaxUnbounded() {
  int count = 1000000;
  StepVerifier.create(Flux.range(1, count)
              .flatMap(v -> Flux.range(v, 2))
              .publishOn(Schedulers.fromExecutorService(exec)))
        .expectNextCount(2 * count)
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void crossRangeMaxHidden() throws Exception {
  int count = 1000000;
  StepVerifier.create(Flux.range(1, count)
              .hide()
              .flatMap(v -> Flux.range(v, 2)
                       .hide(), false, 4, 32)
              .hide()
              .publishOn(Schedulers.fromExecutorService(exec)))
        .expectNextCount(2 * count)
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void crossRangeHidden() {
  int count = 1000000;
  StepVerifier.create(Flux.range(1, count)
              .hide()
              .flatMap(v -> Flux.range(v, 2)
                       .hide(), false, 128, 1)
              .hide()
              .publishOn(Schedulers.fromExecutorService(exec)))
        .expectNextCount(2 * count)
        .verifyComplete();
}

相关文章