本文整理了Java中reactor.core.scheduler.Schedulers.newParallel()
方法的一些代码示例,展示了Schedulers.newParallel()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Schedulers.newParallel()
方法的具体详情如下:
包路径:reactor.core.scheduler.Schedulers
类名称:Schedulers
方法名:newParallel
[英]Scheduler that hosts a fixed pool of single-threaded ExecutorService-based workers and is suited for parallel work.
[中]调度程序,托管一个固定的基于服务的单线程执行器工作池,适合并行工作。
代码示例来源:origin: reactor/reactor-core
/**
* {@link Scheduler} that hosts a fixed pool of single-threaded ExecutorService-based
* workers and is suited for parallel work. This type of {@link Scheduler} detects and
* rejects usage of blocking Reactor APIs.
*
* @param name Thread prefix
*
* @return a new {@link Scheduler} that hosts a fixed pool of single-threaded
* ExecutorService-based workers and is suited for parallel work
*/
public static Scheduler newParallel(String name) {
return newParallel(name, DEFAULT_POOL_SIZE);
}
代码示例来源:origin: reactor/reactor-core
/**
* {@link Scheduler} that hosts a fixed pool of single-threaded ExecutorService-based
* workers and is suited for parallel work. This type of {@link Scheduler} detects and
* rejects usage of blocking Reactor APIs.
*
* @param name Thread prefix
* @param parallelism Number of pooled workers.
*
* @return a new {@link Scheduler} that hosts a fixed pool of single-threaded
* ExecutorService-based workers and is suited for parallel work
*/
public static Scheduler newParallel(String name, int parallelism) {
return newParallel(name, parallelism, false);
}
代码示例来源:origin: reactor/reactor-core
@Override
protected Scheduler scheduler() {
return Schedulers.newParallel("ParallelSchedulerTest");
}
代码示例来源:origin: reactor/reactor-core
/**
* {@link Scheduler} that hosts a fixed pool of single-threaded ExecutorService-based
* workers and is suited for parallel work. This type of {@link Scheduler} detects and
* rejects usage of blocking Reactor APIs.
*
* @param name Thread prefix
* @param parallelism Number of pooled workers.
* @param daemon false if the {@link Scheduler} requires an explicit {@link
* Scheduler#dispose()} to exit the VM.
*
* @return a new {@link Scheduler} that hosts a fixed pool of single-threaded
* ExecutorService-based workers and is suited for parallel work
*/
public static Scheduler newParallel(String name, int parallelism, boolean daemon) {
return newParallel(parallelism,
new ReactorThreadFactory(name, ParallelScheduler.COUNTER, daemon,
true, Schedulers::defaultUncaughtException));
}
代码示例来源:origin: reactor/reactor-core
@BeforeMethod
public void init() {
sharedGroup = Schedulers.newParallel("fluxion-tck", 2);
}
代码示例来源:origin: reactor/reactor-core
@Test(expected = IllegalArgumentException.class)
public void negativeParallelism() throws Exception {
Schedulers.newParallel("test", -1);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void disablingMetricsRemovesSchedulerMeters() {
Schedulers.newParallel("A", 1);
Schedulers.newParallel("A", 1);
Schedulers.newParallel("A", 1);
Metrics.globalRegistry.counter("foo", "tagged", "bar");
Schedulers.disableMetrics();
assertThat(simpleMeterRegistry.getMeters()
.stream()
.map(m -> m.getId().getName())
.distinct())
.containsExactly("foo");
}
}
代码示例来源:origin: reactor/reactor-core
@Test
public void metricsActivatedHandleNamingClash() {
Schedulers.newParallel("A", 1);
Schedulers.newParallel("A", 1);
Schedulers.newParallel("A", 1);
assertThat(simpleMeterRegistry.getMeters()
.stream()
.map(m -> m.getId().getTag("name"))
.distinct())
.containsOnly(
"parallel(1,\"A\")-0",
"parallel(1,\"A\")#1-0",
"parallel(1,\"A\")#2-0"
);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void shouldPickEvenly() throws Exception {
int n = 4;
int m = 25;
Scheduler scheduler = Schedulers.newParallel("test", n);
CountDownLatch latch = new CountDownLatch(m*n);
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
for (int i = 0; i < m * n; i++) {
scheduler.schedule(() -> {
String threadName = Thread.currentThread().getName();
map.compute(threadName, (name, val) -> Optional.ofNullable(val).map(x -> x+1).orElse(1));
latch.countDown();
});
}
latch.await();
assertThat(map.values()).containsOnly(m);
}
代码示例来源:origin: reactor/reactor-core
@BeforeClass
public static void loadEnv() {
ioGroup = Schedulers.newElastic("work");
asyncGroup = Schedulers.newParallel("parallel", 4);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void metricsActivatedHasDistinctNameTags() {
Schedulers.newParallel("A", 3);
Schedulers.newParallel("B", 2);
assertThat(simpleMeterRegistry.getMeters()
.stream()
.map(m -> m.getId().getTag("name"))
.distinct())
.containsOnly(
"parallel(3,\"A\")-0",
"parallel(3,\"A\")-1",
"parallel(3,\"A\")-2",
"parallel(2,\"B\")-0",
"parallel(2,\"B\")-1"
);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void restartParallel() {
restart(Schedulers.newParallel("test"));
}
代码示例来源:origin: reactor/reactor-core
@Test
public void testRejectingParallelScheduler() {
assertRejectingScheduler(Schedulers.newParallel("test"));
}
代码示例来源:origin: reactor/reactor-core
@Test(timeout = 5000)
public void parallelSchedulerThreadCheck() throws Exception{
Scheduler s = Schedulers.newParallel("work", 2);
try {
Scheduler.Worker w = s.createWorker();
Thread currentThread = Thread.currentThread();
AtomicReference<Thread> taskThread = new AtomicReference<>(currentThread);
CountDownLatch latch = new CountDownLatch(1);
w.schedule(() -> {
taskThread.set(Thread.currentThread());
latch.countDown();
});
latch.await();
assertThat(taskThread.get()).isNotEqualTo(currentThread);
}
finally {
s.dispose();
}
}
代码示例来源:origin: reactor/reactor-core
@Test
public void metricsActivatedHasDistinctSchedulerIdTags() {
Schedulers.newParallel("A", 4);
Schedulers.newParallel("A", 4);
Schedulers.newParallel("A", 3);
Schedulers.newSingle("B");
Schedulers.newElastic("C").createWorker();
assertThat(simpleMeterRegistry.getMeters()
.stream()
.map(m -> m.getId().getTag(SchedulerMetricDecorator.TAG_SCHEDULER_ID))
.distinct())
.containsOnly(
"parallel(4,\"A\")",
"parallel(4,\"A\")#1",
"parallel(3,\"A\")",
"single(\"B\")",
"elastic(\"C\")"
);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void recursiveParallelCall() throws Exception {
Scheduler s = Schedulers.newParallel("work", 4);
try {
Scheduler.Worker w = s.createWorker();
CountDownLatch latch = new CountDownLatch(2);
w.schedule(() -> recursiveCall(w, latch, 0));
latch.await();
}
finally {
s.dispose();
}
}
代码示例来源:origin: reactor/reactor-core
@Test
public void scanCapacity() {
Scheduler scheduler = Schedulers.newParallel(12, Thread::new);
try {
assertThat(scheduler)
.matches(s -> Scannable.from(s).isScanAvailable(), "isScanAvailable")
.satisfies(s -> assertThat(Scannable.from(s).scan(Scannable.Attr.CAPACITY)).isEqualTo(12));
}
finally {
scheduler.dispose();
}
}
}
代码示例来源:origin: reactor/reactor-core
@Test
public void gh783() {
int size = 1;
Scheduler parallel = Schedulers.newParallel("gh-783");
StepVerifier.withVirtualTime(() -> Flux.just("Oops")
.take(size)
.subscribeOn(parallel)
.flatMap(message -> {
Flux<Long> interval = Flux.interval(Duration.ofSeconds(1));
return interval.map( tick -> message);
})
.take(size)
.collectList()
)
.thenAwait(Duration.ofHours(1))
.consumeNextWith(list -> assertThat(list).hasSize(size))
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void allEnabled() {
Assert.assertFalse(Schedulers.newParallel("") instanceof VirtualTimeScheduler);
Assert.assertFalse(Schedulers.newElastic("") instanceof VirtualTimeScheduler);
Assert.assertFalse(Schedulers.newSingle("") instanceof VirtualTimeScheduler);
VirtualTimeScheduler.getOrSet();
Assert.assertTrue(Schedulers.newParallel("") instanceof VirtualTimeScheduler);
Assert.assertTrue(Schedulers.newElastic("") instanceof VirtualTimeScheduler);
Assert.assertTrue(Schedulers.newSingle("") instanceof VirtualTimeScheduler);
VirtualTimeScheduler t = VirtualTimeScheduler.get();
Assert.assertSame(Schedulers.newParallel(""), t);
Assert.assertSame(Schedulers.newElastic(""), t);
Assert.assertSame(Schedulers.newSingle(""), t);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void testOverride() throws InterruptedException {
TestSchedulers ts = new TestSchedulers(true);
Schedulers.setFactory(ts);
Assert.assertEquals(ts.single, Schedulers.newSingle("unused"));
Assert.assertEquals(ts.elastic, Schedulers.newElastic("unused"));
Assert.assertEquals(ts.parallel, Schedulers.newParallel("unused"));
Schedulers.resetFactory();
Scheduler s = Schedulers.newSingle("unused");
s.dispose();
Assert.assertNotSame(ts.single, s);
}
内容来源于网络,如有侵权,请联系作者删除!