reactor.core.publisher.Flux.parallel()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(7.1k)|赞(0)|评价(0)|浏览(388)

本文整理了Java中reactor.core.publisher.Flux.parallel()方法的一些代码示例,展示了Flux.parallel()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.parallel()方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:parallel

Flux.parallel介绍

[英]Prepare this Flux by dividing data on a number of 'rails' matching the number of CPU cores, in a round-robin fashion. Note that to actually perform the work in parallel, you should call ParallelFlux#runOn(Scheduler) afterward.
[中]通过以循环方式将数据划分到与CPU内核数量相匹配的多个“rails”上来准备此流量。请注意,要实际并行执行工作,应该在之后调用ParallelFlux#runOn(调度程序)。

代码示例

代码示例来源:origin: reactor/reactor-core

@Test
public void advancedParallelJustDivided() {
  Flux.range(1, 10)
    .parallel(2) //<1>
    .subscribe(i -> System.out.println(Thread.currentThread().getName() + " -> " + i));
}

代码示例来源:origin: reactor/reactor-core

@Override
@SuppressWarnings("unchecked")
protected ParallelFlux<I> sourceCallable(OperatorScenario<I, ParallelFlux<I>, O, ParallelFlux<O>> scenario) {
  if(scenario.producerCount() == 0){
    return (ParallelFlux<I>)Mono.fromRunnable(() -> {})
      .flux()
      .parallel(4);
  }
  return (ParallelFlux<I>) Mono.fromCallable(() -> scenario.producingMapper.apply(0))
                 .flux()
                 .parallel(4);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void testPrefetch() {
  assertThat(Flux.range(1, 10)
          .parallel(3)
          .collect(ArrayList::new, List::add)
          .getPrefetch()).isEqualTo(Integer.MAX_VALUE);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void advancedParallelParallelized() {
  Flux.range(1, 10)
    .parallel(2)
    .runOn(Schedulers.parallel())
    .subscribe(i -> System.out.println(Thread.currentThread().getName() + " -> " + i));
}

代码示例来源:origin: reactor/reactor-core

@Test
public void parallelism() {
  ParallelFlux<Integer> source = Flux.just(500, 300).parallel(10);
  ParallelPeek<Integer> test = new ParallelPeek<>(source, null, null, null, null, null, null, null, null);
  assertThat(test.parallelism())
      .isEqualTo(source.parallelism())
      .isEqualTo(10);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void parallelism() {
  ParallelFlux<Integer> source = Flux.range(1, 4).parallel(3);
  ParallelFluxName<Integer> test = new ParallelFluxName<>(source, "foo", null);
  assertThat(test.parallelism())
      .isEqualTo(3)
      .isEqualTo(source.parallelism());
}

代码示例来源:origin: reactor/reactor-core

@Test
public void parallelism() {
  ParallelFlux<Integer> source = Flux.range(1, 4).parallel(3);
  ParallelFluxHide<Integer> test = new ParallelFluxHide<>(source);
  assertThat(test.parallelism())
      .isEqualTo(3)
      .isEqualTo(source.parallelism());
}

代码示例来源:origin: reactor/reactor-core

@Test
public void scanOperator() {
  ParallelFlux<Integer> source = Flux.range(1, 4).parallel(3);
  ParallelMap<Integer, String> test = new ParallelMap<>(source, i -> "" + i);
  assertThat(test.scan(Scannable.Attr.PARENT)).isSameAs(source);
  assertThat(test.scan(Scannable.Attr.PREFETCH)).isEqualTo(-1);
}

代码示例来源:origin: reactor/reactor-core

@Test
  public void scanUnsafe() {
    ParallelFlux<Integer> source = Flux.range(1, 10)
                      .parallel(2);
    ParallelMergeOrdered<Integer> test = new ParallelMergeOrdered<>(source, 123, null, null);

    assertThat(test.scan(Scannable.Attr.PARENT)).isSameAs(source);
    assertThat(test.scan(Scannable.Attr.PREFETCH)).isEqualTo(123);

    assertThat(test.scan(Scannable.Attr.NAME)).isNull();
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
public void scanOperator() {
  ParallelFlux<Integer> source = Flux.just(500, 300).parallel(10);
  ParallelMergeSequential<Integer> test = new ParallelMergeSequential<>(source, 123, Queues.one());
  assertThat(test.scan(Scannable.Attr.PARENT)).isSameAs(source);
  assertThat(test.scan(Scannable.Attr.PREFETCH)).isEqualTo(123);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void scanOperatorErrorModeEnd() throws Exception {
  ParallelFlux<Integer> source = Flux.range(1, 4).parallel(3);
  ParallelConcatMap<Integer, Integer> test = new ParallelConcatMap<>(source,
      i -> Flux.range(1, i), Queues.small(), 123,
      FluxConcatMap.ErrorMode.END);
  assertThat(test.scan(Scannable.Attr.DELAY_ERROR)).isTrue();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void parallelism() {
  ParallelFlux<String> source = Flux.<String>empty().parallel(2);
  ParallelRunOn<String> test = new ParallelRunOn<>(source, Schedulers.single(), 123, Queues.small());
  assertThat(test.parallelism()).isEqualTo(2);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void scanOperatorErrorModeBoundary() throws Exception {
  ParallelFlux<Integer> source = Flux.range(1, 4).parallel(3);
  ParallelConcatMap<Integer, Integer> test = new ParallelConcatMap<>(source,
      i -> Flux.range(1, i), Queues.small(), 123,
      FluxConcatMap.ErrorMode.BOUNDARY);
  assertThat(test.scan(Scannable.Attr.DELAY_ERROR)).isTrue();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void scanOperatorNullTags() throws Exception {
  ParallelFlux<Integer> source = Flux.range(1, 4).parallel(3);
  ParallelFluxName<Integer> test = new ParallelFluxName<>(source, "foo", null);
  assertThat(test.scan(Scannable.Attr.TAGS)).isNull();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void failInitial() {
  Supplier<List<Integer>> as = () -> {
    throw new RuntimeException("test");
  };
  StepVerifier.create(Flux.range(1, 10)
              .parallel(3)
              .collect(as, List::add))
        .verifyErrorMessage("test");
}

代码示例来源:origin: reactor/reactor-core

@Test
public void failCombination() {
  StepVerifier.create(Flux.range(1, 10)
              .parallel(3)
              .collect(() -> 0, (a, b) -> {
                throw new RuntimeException("test");
              }))
        .verifyErrorMessage("test");
}

代码示例来源:origin: reactor/reactor-core

@Test
public void scanOperator() throws Exception {
  ParallelFlux<Integer> source = Flux.range(1, 4).parallel(3);
  ParallelFilter<Integer> test = new ParallelFilter<>(source, i -> i % 2 == 0);
  assertThat(test.scan(Scannable.Attr.PARENT)).isSameAs(source);
  assertThat(test.scan(Scannable.Attr.PREFETCH))
      .isEqualTo(-1)
      .isNotEqualTo(source.getPrefetch());
}

代码示例来源:origin: reactor/reactor-core

@Test
public void scanOperator() throws Exception {
  ParallelFlux<Integer> source = Flux.range(1, 4).parallel(3);
  ParallelConcatMap<Integer, Integer> test = new ParallelConcatMap<>(source,
      i -> Flux.range(1, i), Queues.small(), 123,
      FluxConcatMap.ErrorMode.IMMEDIATE);
  assertThat(test.scan(Scannable.Attr.PARENT)).isSameAs(source);
  assertThat(test.scan(Scannable.Attr.PREFETCH))
      .isEqualTo(123)
      .isNotEqualTo(source.getPrefetch());
  assertThat(test.scan(Scannable.Attr.DELAY_ERROR)).isFalse();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void scanOperator() throws Exception {
  ParallelFlux<Integer> source = Flux.range(1, 4).parallel(3);
  ParallelFlatMap<Integer, Integer> test = new ParallelFlatMap<>(source,
      i -> Flux.range(1, i), true, 12,
      Queues.small(), 123, Queues.small());
  assertThat(test.scan(Scannable.Attr.PARENT)).isSameAs(source);
  assertThat(test.scan(Scannable.Attr.PREFETCH))
      .isEqualTo(123)
      .isNotEqualTo(source.getPrefetch());
  assertThat(test.scan(Scannable.Attr.DELAY_ERROR)).isTrue();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void groupMerge() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Flux.range(1, 10)
    .parallel()
    .groups()
    .flatMap(v -> v)
    .subscribe(ts);
  ts.assertContainValues(new HashSet<>(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)))
   .assertNoError()
   .assertComplete();
}

相关文章

微信公众号

最新文章

更多

Flux类方法