本文整理了Java中reactor.core.scheduler.Schedulers.fromExecutorService()
方法的一些代码示例,展示了Schedulers.fromExecutorService()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Schedulers.fromExecutorService()
方法的具体详情如下:
包路径:reactor.core.scheduler.Schedulers
类名称:Schedulers
方法名:fromExecutorService
[英]Create a Scheduler which uses a backing ExecutorService to schedule Runnables for async operators.
[中]创建一个调度器,该调度器使用backing ExecutorService为异步运算符调度可运行项。
代码示例来源:origin: reactor/reactor-core
@BeforeClass
public static void before() {
scheduler = Schedulers.fromExecutorService(Executors.newSingleThreadExecutor());
nonBlockingScheduler = Schedulers.newSingle("nonBlockingScheduler");
}
代码示例来源:origin: reactor/reactor-core
@Test
public void error() {
StepVerifier.create(Flux.error(new RuntimeException("forced failure"))
.publishOn(Schedulers.fromExecutorService(exec)))
.verifyErrorMessage("forced failure");
}
代码示例来源:origin: reactor/reactor-core
@Test
public void callableEvaluatedTheRightTime() {
AtomicInteger count = new AtomicInteger();
Mono<Integer> p = Mono.fromCallable(count::incrementAndGet).subscribeOn(Schedulers.fromExecutorService(ForkJoinPool.commonPool()));
Assert.assertEquals(0, count.get());
p.subscribeWith(AssertSubscriber.create()).await();
Assert.assertEquals(1, count.get());
}
代码示例来源:origin: reactor/reactor-core
@Test
public void mappedsyncSourceWithNull2() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Flux.fromIterable(Arrays.asList(1, 2))
.map(v -> v == 2 ? null : v)
.publishOn(Schedulers.fromExecutorService(exec))
.subscribe(ts);
ts.await(Duration.ofSeconds(5));
ts.assertValues(1)
.assertError(NullPointerException.class)
.assertNotComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void classicEmpty() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Mono.<Integer>empty().subscribeOn(Schedulers.fromExecutorService(ForkJoinPool.commonPool()))
.subscribe(ts);
ts.await(Duration.ofSeconds(5));
ts.assertNoValues()
.assertNoError()
.assertComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void classicEmptyBackpressured() throws Exception {
AssertSubscriber<Integer> ts = AssertSubscriber.create(0);
Mono.<Integer>empty().subscribeOn(Schedulers.fromExecutorService(ForkJoinPool.commonPool()))
.subscribe(ts);
ts.await(Duration.ofSeconds(5));
ts.assertNoValues()
.assertNoError()
.assertComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void classicJust() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Mono.just(1)
.subscribeOn(Schedulers.fromExecutorService(ForkJoinPool.commonPool()))
.subscribe(ts);
ts.await(Duration.ofSeconds(5));
ts.assertValues(1)
.assertNoError()
.assertComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void mappedsyncSourceWithNull() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Flux.just(1, 2)
.map(v -> v == 2 ? null : v)
.publishOn(Schedulers.fromExecutorService(exec))
.subscribe(ts);
ts.await(Duration.ofSeconds(5));
ts.assertValues(1)
.assertError(NullPointerException.class)
.assertNotComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void mappedsyncSourceWithNullHidden() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Flux.just(1, 2)
.hide()
.map(v -> v == 2 ? null : v)
.publishOn(Schedulers.fromExecutorService(exec))
.subscribe(ts);
ts.await(Duration.ofSeconds(5));
ts.assertValues(1)
.assertError(NullPointerException.class)
.assertNotComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void classicJust() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Flux.just(1)
.publishOn(Schedulers.fromExecutorService(exec))
.subscribe(ts);
ts.await(Duration.ofSeconds(5));
ts.assertValues(1)
.assertNoError()
.assertComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void syncSourceWithNull() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Flux.just(1, null, 1)
.publishOn(Schedulers.fromExecutorService(exec))
.subscribe(ts);
ts.await(Duration.ofSeconds(5));
ts.assertValues(1)
.assertError(NullPointerException.class)
.assertNotComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void classic() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Mono.fromSupplier(() -> 1)
.subscribeOn(Schedulers.fromExecutorService(ForkJoinPool.commonPool()))
.subscribe(ts);
ts.await(Duration.ofSeconds(5));
ts.assertValueCount(1)
.assertNoError()
.assertComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void classicEmptyBackpressured() throws Exception {
AssertSubscriber<Integer> ts = AssertSubscriber.create(0);
Flux.<Integer>empty().subscribeOn(Schedulers.fromExecutorService(ForkJoinPool.commonPool())).subscribe(ts);
ts.await(Duration.ofSeconds(5));
ts.assertNoValues()
.assertNoError()
.assertComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void classic() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Flux.range(1, 1000).subscribeOn(Schedulers.fromExecutorService(ForkJoinPool.commonPool())).subscribe(ts);
ts.await(Duration.ofSeconds(5));
ts.assertValueCount(1000)
.assertNoError()
.assertComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void classicEmpty() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Flux.<Integer>empty().subscribeOn(Schedulers.fromExecutorService(ForkJoinPool.commonPool())).subscribe(ts);
ts.await(Duration.ofSeconds(5));
ts.assertNoValues()
.assertNoError()
.assertComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void crossRange() {
int count = 1000000;
StepVerifier.create(Flux.range(1, count)
.flatMap(v -> Flux.range(v, 2), false, 128, 1)
.publishOn(Schedulers.fromExecutorService(exec)))
.expectNextCount(2 * count)
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void crossRangeMax() {
int count = 1000000;
StepVerifier.create(Flux.range(1, count)
.flatMap(v -> Flux.range(v, 2), false, 128, 32)
.publishOn(Schedulers.fromExecutorService(exec)))
.expectNextCount(2 * count)
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void crossRangeMaxUnbounded() {
int count = 1000000;
StepVerifier.create(Flux.range(1, count)
.flatMap(v -> Flux.range(v, 2))
.publishOn(Schedulers.fromExecutorService(exec)))
.expectNextCount(2 * count)
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void crossRangeMaxHidden() throws Exception {
int count = 1000000;
StepVerifier.create(Flux.range(1, count)
.hide()
.flatMap(v -> Flux.range(v, 2)
.hide(), false, 4, 32)
.hide()
.publishOn(Schedulers.fromExecutorService(exec)))
.expectNextCount(2 * count)
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void crossRangeHidden() {
int count = 1000000;
StepVerifier.create(Flux.range(1, count)
.hide()
.flatMap(v -> Flux.range(v, 2)
.hide(), false, 128, 1)
.hide()
.publishOn(Schedulers.fromExecutorService(exec)))
.expectNextCount(2 * count)
.verifyComplete();
}
内容来源于网络,如有侵权,请联系作者删除!