本文整理了Java中reactor.core.scheduler.Schedulers.parallel()
方法的一些代码示例,展示了Schedulers.parallel()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Schedulers.parallel()
方法的具体详情如下:
包路径:reactor.core.scheduler.Schedulers
类名称:Schedulers
方法名:parallel
[英]Scheduler that hosts a fixed pool of single-threaded ExecutorService-based workers and is suited for parallel work.
[中]
代码示例来源:origin: resilience4j/resilience4j
/**
* Creates a RateLimiterOperator.
*
* @param <T> the value type of the upstream and downstream
* @param rateLimiter the Rate limiter
* @return a RateLimiterOperator
*/
public static <T> RateLimiterOperator<T> of(RateLimiter rateLimiter) {
return of(rateLimiter, Schedulers.parallel());
}
代码示例来源:origin: resilience4j/resilience4j
/**
* Creates a BulkheadOperator.
*
* @param <T> the value type of the upstream and downstream
* @param bulkhead the Bulkhead
* @return a BulkheadOperator
*/
public static <T> BulkheadOperator<T> of(Bulkhead bulkhead) {
return of(bulkhead, Schedulers.parallel());
}
代码示例来源:origin: reactor/reactor-core
@Test
public void sourceCachedNoCoordinatorLeak() {
TestPublisher<Integer> source = TestPublisher.create();
MonoCacheTime<Integer> cached = new MonoCacheTime<>(source.mono(), Duration.ofSeconds(2),
Schedulers.parallel());
cached.subscribe();
WeakReference<Signal<Integer>> refCoordinator = new WeakReference<>(cached.state);
assertThat(refCoordinator.get()).isInstanceOf(MonoCacheTime.CoordinatorSubscriber.class);
source.emit(100);
System.gc();
assertThat(refCoordinator.get()).isNull();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void scanOperator() {
final Flux<Long> interval = Flux.interval(Duration.ofSeconds(1));
assertThat(interval).isInstanceOf(Scannable.class);
assertThat(((Scannable) interval).scan(Scannable.Attr.RUN_ON))
.isSameAs(Schedulers.parallel());
}
代码示例来源:origin: reactor/reactor-core
@Test
public void advancedParallelParallelized() {
Flux.range(1, 10)
.parallel(2)
.runOn(Schedulers.parallel())
.subscribe(i -> System.out.println(Thread.currentThread().getName() + " -> " + i));
}
代码示例来源:origin: reactor/reactor-core
@Test
public void scanOperator() {
final Flux<List<Integer>> flux = Flux.just(1).bufferTimeout(3, Duration.ofSeconds(1));
assertThat(flux).isInstanceOf(Scannable.class);
assertThat(((Scannable) flux).scan(Scannable.Attr.RUN_ON)).isSameAs(Schedulers.parallel());
}
代码示例来源:origin: reactor/reactor-core
@Test
public void testSubscribeOnValueFusion() {
StepVerifier.create(Mono.just(1)
.flatMapMany(f -> Mono.just(f + 1)
.subscribeOn(Schedulers.parallel())
.map(this::slow)))
.expectFusion(Fuseable.ASYNC, Fuseable.NONE)
.expectNext(2)
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void fromPublishersSequentialSubscribe() {
List<Integer> values = Collections.synchronizedList(new ArrayList<>(10));
ParallelFlux.from(Flux.range(1, 3), Flux.range(4, 3))
.runOn(Schedulers.parallel())
.doOnNext(values::add)
.sequential()
.blockLast();
assertThat(values)
.hasSize(6)
.containsExactlyInAnyOrder(1, 2, 3, 4, 5, 6);
}
代码示例来源:origin: reactor/reactor-core
Flux<Integer> flatMapScenario() {
return Flux.interval(Duration.ofSeconds(3))
.flatMap(v -> Flux.fromIterable(Arrays.asList("A"))
.flatMap(w -> Mono.fromCallable(() -> Arrays.asList(1, 2))
.subscribeOn(Schedulers.parallel())
.flatMapMany(Flux::fromIterable))).log();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void runOnZeroPrefetchRejected() {
ParallelFlux<Integer> validSoFar = ParallelFlux.from(Mono.just(1));
Assertions.assertThatExceptionOfType(IllegalArgumentException.class)
.isThrownBy(() -> validSoFar.runOn(Schedulers.parallel(), 0))
.withMessage("prefetch > 0 required but it was 0");
}
代码示例来源:origin: reactor/reactor-core
@Test
public void runOnNegativePrefetchRejected() {
ParallelFlux<Integer> validSoFar = ParallelFlux.from(Mono.just(1));
Assertions.assertThatExceptionOfType(IllegalArgumentException.class)
.isThrownBy(() -> validSoFar.runOn(Schedulers.parallel(), -1))
.withMessage("prefetch > 0 required but it was -1");
}
代码示例来源:origin: reactor/reactor-core
@Test
public void createStreamFromMonoCreate2() {
StepVerifier.create(Mono.create(MonoSink::success)
.publishOn(Schedulers.parallel()))
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void transformChangesPrefetch() {
assertThat(ParallelFlux.from(Flux.range(1, 10), 3, 12, Queues.small())
.transform(pf -> pf.runOn(Schedulers.parallel(), 3)
.log()
.hide())
.getPrefetch())
.isEqualTo(3);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void expectNextAsync() {
Flux<String> flux = Flux.just("foo", "bar")
.publishOn(Schedulers.parallel());
StepVerifier.create(flux)
.expectNext("foo")
.expectNext("bar")
.expectComplete()
.verify();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void missingNextAsync() {
Flux<String> flux = Flux.just("foo", "bar")
.publishOn(Schedulers.parallel());
assertThatExceptionOfType(AssertionError.class)
.isThrownBy(() -> StepVerifier.create(flux)
.expectNext("foo")
.expectComplete()
.verify())
.withMessage("expectation \"expectComplete\" failed (expected: onComplete(); actual: onNext(bar))");
}
代码示例来源:origin: reactor/reactor-core
@Test
public void fluxCreateSerialized2(){
StepVerifier.create(Flux.create(s -> {
s.next("test1");
s.next("test2");
s.next("test3");
s.complete();
}).publishOn(Schedulers.parallel()))
.expectNext("test1", "test2", "test3")
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void fluxCreateDrop2(){
StepVerifier.create(Flux.create(s -> {
s.next("test1");
s.next("test2");
s.next("test3");
s.complete();
}, FluxSink.OverflowStrategy.DROP).publishOn(Schedulers.parallel()))
.expectNext("test1", "test2", "test3")
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void fluxCreateLatest2(){
StepVerifier.create(Flux.create(s -> {
s.next("test1");
s.next("test2");
s.next("test3");
s.complete();
}, FluxSink.OverflowStrategy.LATEST).publishOn(Schedulers.parallel()))
.expectNext("test1", "test2", "test3")
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void fluxCreateIgnore2(){
StepVerifier.create(Flux.create(s -> {
s.next("test1");
s.next("test2");
s.next("test3");
s.complete();
}, FluxSink.OverflowStrategy.IGNORE).publishOn(Schedulers.parallel()))
.expectNext("test1", "test2", "test3")
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void fluxCreateError2(){
StepVerifier.create(Flux.create(s -> {
s.next("test1");
s.next("test2");
s.next("test3");
s.complete();
}, FluxSink.OverflowStrategy.ERROR).publishOn(Schedulers.parallel()))
.expectNext("test1", "test2", "test3")
.verifyComplete();
}
内容来源于网络,如有侵权,请联系作者删除!