reactor.core.publisher.Flux.onErrorStop()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(3.7k)|赞(0)|评价(0)|浏览(315)

本文整理了Java中reactor.core.publisher.Flux.onErrorStop()方法的一些代码示例,展示了Flux.onErrorStop()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.onErrorStop()方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:onErrorStop

Flux.onErrorStop介绍

[英]If an #onErrorContinue(BiConsumer) variant has been used before, reverts to the default 'STOP' mode where errors are terminal events. It can be used for easier scoping of the on next failure strategy or to override the inherited strategy in a sub-stream (for example in a flatMap). It has no effect if #onErrorContinue(BiConsumer) has not been used.
[中]如果以前使用过#onErrorContinue(BiConsumer)变体,则会恢复到默认的“停止”模式,其中错误是终端事件。它可以用于更容易地确定下一个故障策略的范围,或者覆盖子流中的继承策略(例如在flatMap中)。如果未使用#OneRorContinue(双消费者),则此选项无效。

代码示例

代码示例来源:origin: reactor/reactor-core

@Test
public void errorStrategySimpleScoping() {
  Flux<Integer> test = Flux.just(0, 1, 2, 3)
      .map(i -> {
        if (i == 3) {
          throw new IllegalStateException();
        }
        else {
          return i;
        }
      })
      .onErrorStop()
      .map(i -> 10 / i)
      .onErrorContinue(OnNextFailureStrategyTest::drop);
  StepVerifier.create(test)
      .expectNext(10, 5)
      .expectError(IllegalStateException.class)
      .verifyThenAssertThat()
      .hasDropped(0)
      .hasDroppedErrors(1);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void overrideInheritedErrorStrategyInFlatMap() {
  Flux<Integer> test = Flux.just(1, 2, 3)
      .flatMap(i -> Flux.range(0, i + 1)
          .map(v -> 30 / v)
          .onErrorReturn(100)
          .onErrorStop())
      .onErrorContinue(OnNextFailureStrategyTest::drop);
  StepVerifier.create(test)
      .expectNext(100, 100, 100)
      .expectComplete()
      .verify();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void errorModeContinueInternalErrorStopStrategy() {
  Flux<Integer> test = Flux
      .just(0, 1)
      .hide()
      .flatMap(f ->  Flux.range(f, 1).map(i -> 1/i).onErrorStop())
      .onErrorContinue(OnNextFailureStrategyTest::drop);
  StepVerifier.create(test)
      .expectNoFusionSupport()
      .expectNext(1)
      .expectComplete()
      .verifyThenAssertThat()
      .hasNotDroppedElements()
      .hasDroppedErrors(1);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void errorModeContinueInternalErrorStopStrategy() {
  Flux<Integer> test = Flux
      .just(0, 1)
      .hide()
      .concatMap(f ->  Flux.range(f, 1).map(i -> 1/i).onErrorStop())
      .onErrorContinue(OnNextFailureStrategyTest::drop);
  StepVerifier.create(test)
      .expectNoFusionSupport()
      .expectNext(1)
      .expectComplete()
      .verifyThenAssertThat()
      .hasNotDroppedElements()
      .hasDroppedErrors(1);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void errorModeContinueInternalErrorStopStrategyAsync() {
  Flux<Integer> test = Flux
      .just(0, 1)
      .hide()
      .flatMap(f ->  Flux.range(f, 1).publishOn(Schedulers.parallel()).map(i -> 1/i).onErrorStop())
      .onErrorContinue(OnNextFailureStrategyTest::drop);
  StepVerifier.create(test)
      .expectNoFusionSupport()
      .expectNext(1)
      .expectComplete()
      .verifyThenAssertThat()
      .hasNotDroppedElements()
      .hasDroppedErrors(1);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void errorModeContinueInternalErrorStopStrategyAsync() {
  Flux<Integer> test = Flux
      .just(0, 1)
      .hide()
      .concatMap(f ->  Flux.range(f, 1).publishOn(Schedulers.parallel()).map(i -> 1 / i).onErrorStop())
      .onErrorContinue(OnNextFailureStrategyTest::drop);
  StepVerifier.create(test)
      .expectNoFusionSupport()
      .expectNext(1)
      .expectComplete()
      .verifyThenAssertThat()
      .hasNotDroppedElements()
      .hasDroppedErrors(1);
}

代码示例来源:origin: mulesoft/mule

@Override
public Publisher<CoreEvent> apply(Publisher<CoreEvent> publisher) {
 Flux<CoreEvent> schedulerTrackingPublisher = from(publisher)
   .doOnEach(signal -> signal.getContext().getOrEmpty(PROCESSOR_SCHEDULER_CONTEXT_KEY)
     .ifPresent(sch -> schedulers.add(((Scheduler) sch).getName())));
 if (getProcessingType() == CPU_LITE_ASYNC) {
  return from(schedulerTrackingPublisher).transform(processorPublisher -> Processor.super.apply(schedulerTrackingPublisher))
    .publishOn(fromExecutorService(custom)).onErrorStop();
 } else {
  return Processor.super.apply(schedulerTrackingPublisher);
 }
}

相关文章

微信公众号

最新文章

更多

Flux类方法