reactor.core.scheduler.Schedulers.newParallel()方法的使用及代码示例

x33g5p2x  于2022-01-30 转载在 其他  
字(8.6k)|赞(0)|评价(0)|浏览(382)

本文整理了Java中reactor.core.scheduler.Schedulers.newParallel()方法的一些代码示例,展示了Schedulers.newParallel()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Schedulers.newParallel()方法的具体详情如下:
包路径:reactor.core.scheduler.Schedulers
类名称:Schedulers
方法名:newParallel

Schedulers.newParallel介绍

[英]Scheduler that hosts a fixed pool of single-threaded ExecutorService-based workers and is suited for parallel work.
[中]调度程序,托管一个固定的基于服务的单线程执行器工作池,适合并行工作。

代码示例

代码示例来源:origin: reactor/reactor-core

/**
 * {@link Scheduler} that hosts a fixed pool of single-threaded ExecutorService-based
 * workers and is suited for parallel work. This type of {@link Scheduler} detects and
 * rejects usage of blocking Reactor APIs.
 *
 * @param name Thread prefix
 *
 * @return a new {@link Scheduler} that hosts a fixed pool of single-threaded
 * ExecutorService-based workers and is suited for parallel work
 */
public static Scheduler newParallel(String name) {
  return newParallel(name, DEFAULT_POOL_SIZE);
}

代码示例来源:origin: reactor/reactor-core

/**
 * {@link Scheduler} that hosts a fixed pool of single-threaded ExecutorService-based
 * workers and is suited for parallel work. This type of {@link Scheduler} detects and
 * rejects usage of blocking Reactor APIs.
 *
 * @param name Thread prefix
 * @param parallelism Number of pooled workers.
 *
 * @return a new {@link Scheduler} that hosts a fixed pool of single-threaded
 * ExecutorService-based workers and is suited for parallel work
 */
public static Scheduler newParallel(String name, int parallelism) {
  return newParallel(name, parallelism, false);
}

代码示例来源:origin: reactor/reactor-core

@Override
protected Scheduler scheduler() {
  return Schedulers.newParallel("ParallelSchedulerTest");
}

代码示例来源:origin: reactor/reactor-core

/**
 * {@link Scheduler} that hosts a fixed pool of single-threaded ExecutorService-based
 * workers and is suited for parallel work. This type of {@link Scheduler} detects and
 * rejects usage of blocking Reactor APIs.
 *
 * @param name Thread prefix
 * @param parallelism Number of pooled workers.
 * @param daemon false if the {@link Scheduler} requires an explicit {@link
 * Scheduler#dispose()} to exit the VM.
 *
 * @return a new {@link Scheduler} that hosts a fixed pool of single-threaded
 * ExecutorService-based workers and is suited for parallel work
 */
public static Scheduler newParallel(String name, int parallelism, boolean daemon) {
  return newParallel(parallelism,
      new ReactorThreadFactory(name, ParallelScheduler.COUNTER, daemon,
          true, Schedulers::defaultUncaughtException));
}

代码示例来源:origin: reactor/reactor-core

@BeforeMethod
public void init() {
  sharedGroup = Schedulers.newParallel("fluxion-tck", 2);
}

代码示例来源:origin: reactor/reactor-core

@Test(expected = IllegalArgumentException.class)
public void negativeParallelism() throws Exception {
  Schedulers.newParallel("test", -1);
}

代码示例来源:origin: reactor/reactor-core

@Test
  public void disablingMetricsRemovesSchedulerMeters() {
    Schedulers.newParallel("A", 1);
    Schedulers.newParallel("A", 1);
    Schedulers.newParallel("A", 1);

    Metrics.globalRegistry.counter("foo", "tagged", "bar");

    Schedulers.disableMetrics();

    assertThat(simpleMeterRegistry.getMeters()
                   .stream()
                   .map(m -> m.getId().getName())
                   .distinct())
        .containsExactly("foo");
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
public void metricsActivatedHandleNamingClash() {
  Schedulers.newParallel("A", 1);
  Schedulers.newParallel("A", 1);
  Schedulers.newParallel("A", 1);
  assertThat(simpleMeterRegistry.getMeters()
                 .stream()
                 .map(m -> m.getId().getTag("name"))
                 .distinct())
      .containsOnly(
          "parallel(1,\"A\")-0",
          "parallel(1,\"A\")#1-0",
          "parallel(1,\"A\")#2-0"
      );
}

代码示例来源:origin: reactor/reactor-core

@Test
public void shouldPickEvenly() throws Exception {
  int n = 4;
  int m = 25;
  Scheduler scheduler = Schedulers.newParallel("test", n);
  CountDownLatch latch = new CountDownLatch(m*n);
  ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
  for (int i = 0; i < m * n; i++) {
    scheduler.schedule(() -> {
      String threadName = Thread.currentThread().getName();
      map.compute(threadName, (name, val) -> Optional.ofNullable(val).map(x -> x+1).orElse(1));
      latch.countDown();
    });
  }
  latch.await();
  assertThat(map.values()).containsOnly(m);
}

代码示例来源:origin: reactor/reactor-core

@BeforeClass
public static void loadEnv() {
  ioGroup = Schedulers.newElastic("work");
  asyncGroup = Schedulers.newParallel("parallel", 4);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void metricsActivatedHasDistinctNameTags() {
  Schedulers.newParallel("A", 3);
  Schedulers.newParallel("B", 2);
  assertThat(simpleMeterRegistry.getMeters()
                 .stream()
                 .map(m -> m.getId().getTag("name"))
                 .distinct())
      .containsOnly(
          "parallel(3,\"A\")-0",
          "parallel(3,\"A\")-1",
          "parallel(3,\"A\")-2",
          "parallel(2,\"B\")-0",
          "parallel(2,\"B\")-1"
      );
}

代码示例来源:origin: reactor/reactor-core

@Test
public void restartParallel() {
  restart(Schedulers.newParallel("test"));
}

代码示例来源:origin: reactor/reactor-core

@Test
public void testRejectingParallelScheduler() {
  assertRejectingScheduler(Schedulers.newParallel("test"));
}

代码示例来源:origin: reactor/reactor-core

@Test(timeout = 5000)
public void parallelSchedulerThreadCheck() throws Exception{
  Scheduler s = Schedulers.newParallel("work", 2);
  try {
    Scheduler.Worker w = s.createWorker();
    Thread currentThread = Thread.currentThread();
    AtomicReference<Thread> taskThread = new AtomicReference<>(currentThread);
    CountDownLatch latch = new CountDownLatch(1);
    w.schedule(() -> {
      taskThread.set(Thread.currentThread());
      latch.countDown();
    });
    latch.await();
    assertThat(taskThread.get()).isNotEqualTo(currentThread);
  }
  finally {
    s.dispose();
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
public void metricsActivatedHasDistinctSchedulerIdTags() {
  Schedulers.newParallel("A", 4);
  Schedulers.newParallel("A", 4);
  Schedulers.newParallel("A", 3);
  Schedulers.newSingle("B");
  Schedulers.newElastic("C").createWorker();
  assertThat(simpleMeterRegistry.getMeters()
                 .stream()
                 .map(m -> m.getId().getTag(SchedulerMetricDecorator.TAG_SCHEDULER_ID))
                 .distinct())
      .containsOnly(
          "parallel(4,\"A\")",
          "parallel(4,\"A\")#1",
          "parallel(3,\"A\")",
          "single(\"B\")",
          "elastic(\"C\")"
      );
}

代码示例来源:origin: reactor/reactor-core

@Test
public void recursiveParallelCall() throws Exception {
  Scheduler s = Schedulers.newParallel("work", 4);
  try {
    Scheduler.Worker w = s.createWorker();
    CountDownLatch latch = new CountDownLatch(2);
    w.schedule(() -> recursiveCall(w, latch, 0));
    latch.await();
  }
  finally {
    s.dispose();
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
  public void scanCapacity() {
    Scheduler scheduler = Schedulers.newParallel(12, Thread::new);

    try {
      assertThat(scheduler)
          .matches(s -> Scannable.from(s).isScanAvailable(), "isScanAvailable")
          .satisfies(s -> assertThat(Scannable.from(s).scan(Scannable.Attr.CAPACITY)).isEqualTo(12));
    }
    finally {
      scheduler.dispose();
    }
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
public void gh783() {
  int size = 1;
  Scheduler parallel = Schedulers.newParallel("gh-783");
  StepVerifier.withVirtualTime(() -> Flux.just("Oops")
                      .take(size)
                      .subscribeOn(parallel)
                      .flatMap(message -> {
                        Flux<Long> interval = Flux.interval(Duration.ofSeconds(1));
                        return interval.map( tick -> message);
                      })
                      .take(size)
                      .collectList()
  )
        .thenAwait(Duration.ofHours(1))
        .consumeNextWith(list -> assertThat(list).hasSize(size))
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void allEnabled() {
  Assert.assertFalse(Schedulers.newParallel("") instanceof VirtualTimeScheduler);
  Assert.assertFalse(Schedulers.newElastic("") instanceof VirtualTimeScheduler);
  Assert.assertFalse(Schedulers.newSingle("") instanceof VirtualTimeScheduler);
  VirtualTimeScheduler.getOrSet();
  Assert.assertTrue(Schedulers.newParallel("") instanceof VirtualTimeScheduler);
  Assert.assertTrue(Schedulers.newElastic("") instanceof VirtualTimeScheduler);
  Assert.assertTrue(Schedulers.newSingle("") instanceof VirtualTimeScheduler);
  VirtualTimeScheduler t = VirtualTimeScheduler.get();
  Assert.assertSame(Schedulers.newParallel(""), t);
  Assert.assertSame(Schedulers.newElastic(""), t);
  Assert.assertSame(Schedulers.newSingle(""), t);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void testOverride() throws InterruptedException {
  TestSchedulers ts = new TestSchedulers(true);
  Schedulers.setFactory(ts);
  Assert.assertEquals(ts.single, Schedulers.newSingle("unused"));
  Assert.assertEquals(ts.elastic, Schedulers.newElastic("unused"));
  Assert.assertEquals(ts.parallel, Schedulers.newParallel("unused"));
  Schedulers.resetFactory();
  Scheduler s = Schedulers.newSingle("unused");
  s.dispose();
  Assert.assertNotSame(ts.single, s);
}

相关文章