reactor.core.publisher.Flux.onErrorContinue()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(9.5k)|赞(0)|评价(0)|浏览(308)

本文整理了Java中reactor.core.publisher.Flux.onErrorContinue()方法的一些代码示例,展示了Flux.onErrorContinue()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.onErrorContinue()方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:onErrorContinue

Flux.onErrorContinue介绍

[英]Let compatible operators upstream recover from errors by dropping the incriminating element from the sequence and continuing with subsequent elements. Only errors matching the specified type are recovered from. The recovered error and associated value are notified via the provided BiConsumer.

Note that this error handling mode is not necessarily implemented by all operators (look for the Error Mode Support javadoc section to find operators that support it).
[中]通过从序列中删除有罪的元素并继续后续元素,让上游的兼容操作符从错误中恢复。仅从中恢复与指定类型匹配的错误。通过提供的双消费者通知恢复的错误和相关值。
请注意,此错误处理模式不一定由所有操作员实现(请查找error mode Support javadoc部分以查找支持它的操作员)。

代码示例

代码示例来源:origin: reactor/reactor-core

/**
 * Let compatible operators <strong>upstream</strong> recover from errors by dropping the
 * incriminating element from the sequence and continuing with subsequent elements.
 * Only errors matching the specified {@code type} are recovered from.
 * The recovered error and associated value are notified via the provided {@link BiConsumer}.
 * <p>
 * <img class="marble" src="doc-files/marbles/onErrorContinueWithClassPredicate.svg" alt="">
 * <p>
 * Note that this error handling mode is not necessarily implemented by all operators
 * (look for the {@code Error Mode Support} javadoc section to find operators that
 * support it).
 *
 * @return a {@link Flux} that attempts to continue processing on some errors.
 */
public final <E extends Throwable> Flux<T> onErrorContinue(Class<E> type, BiConsumer<Throwable, Object> errorConsumer) {
  return onErrorContinue(type::isInstance, errorConsumer);
}

代码示例来源:origin: codecentric/spring-boot-admin

public void start() {
  this.subscription = Flux.interval(this.checkReminderInverval, Schedulers.newSingle("reminders"))
              .log(log.getName(), Level.FINEST)
              .doOnSubscribe(s -> log.debug("Started reminders"))
              .flatMap(i -> this.sendReminders())
              .onErrorContinue((ex, value) -> log.warn(
                "Unexpected error while sending reminders",
                ex
              ))
              .subscribe();
}

代码示例来源:origin: codecentric/spring-boot-admin

public void start() {
  subscription = Flux.from(publisher)
            .log(log.getName(), Level.FINEST)
            .doOnSubscribe(s -> log.debug("Subscribed to {} events", eventType))
            .ofType(eventType)
            .cast(eventType)
            .compose(this::handle)
            .onErrorContinue((ex, value) -> log.warn("Unexpected error while handling {}", value, ex))
            .subscribe();
}

代码示例来源:origin: codecentric/spring-boot-admin

@Override
public void start() {
  super.start();
  intervalSubscription = Flux.interval(updateInterval)
                .doOnSubscribe(s -> log.debug("Scheduled status update every {}", updateInterval))
                .log(log.getName(), Level.FINEST)
                .subscribeOn(Schedulers.newSingle("status-monitor"))
                .concatMap(i -> this.updateStatusForAllInstances())
                .onErrorContinue((ex, value) -> log.warn("Unexpected error while updating statuses",
                  ex
                ))
                .subscribe();
}

代码示例来源:origin: spring-projects/spring-data-elasticsearch

private Flux<Tuple2<InetSocketAddress, ClientResponse>> nodes(@Nullable State state) {
  return Flux.fromIterable(hosts()) //
      .filter(entry -> state == null || entry.getState().equals(state)) //
      .map(ElasticsearchHost::getEndpoint) //
      .flatMap(host -> {
        Mono<ClientResponse> exchange = createWebClient(host) //
            .head().uri("/").exchange().doOnError(throwable -> {
              hosts.put(host, new ElasticsearchHost(host, State.OFFLINE));
              clientProvider.getErrorListener().accept(throwable);
            });
        return Mono.just(host).zipWith(exchange);
      }) //
      .onErrorContinue((throwable, o) -> clientProvider.getErrorListener().accept(throwable));
}

代码示例来源:origin: reactor/reactor-core

@Test
public void errorStrategyConfiguredInFlatMapDoesNotLeak() {
  @SuppressWarnings("divzero")
  Flux<Integer> test = Flux.just(0, 1, 2)
      .map(i -> i / 0)
      .flatMap(i -> Flux.just(i).onErrorContinue(OnNextFailureStrategyTest::drop));
  StepVerifier.create(test)
      .expectError(ArithmeticException.class)
      .verify();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void monoApiWithinFlatMap() {
  Flux<Integer> test = Flux.just(0, 1, 2, 3)
      .flatMap(i -> Mono.just(i).map(v -> 30 / v))
      .onErrorContinue(OnNextFailureStrategyTest::drop);
  StepVerifier.create(test)
      .expectNext(30, 15, 10)
      .expectComplete()
      .verifyThenAssertThat()
      .hasDroppedExactly(0)
      .hasDroppedErrorsSatisfying(
          errors -> assertThat(errors)
              .hasSize(1)
              .allMatch(e -> e instanceof ArithmeticException));
}

代码示例来源:origin: reactor/reactor-core

@Test
public void errorModeContinueInternalErrorMono() {
  Flux<Integer> test = Flux
      .just(0, 1)
      .hide()
      .flatMap(f ->  Mono.just(f).map(i -> 1/i))
      .onErrorContinue(OnNextFailureStrategyTest::drop);
  StepVerifier.create(test)
      .expectNoFusionSupport()
      .expectNext(1)
      .expectComplete()
      .verifyThenAssertThat()
      .hasDropped(0)
      .hasDroppedErrors(1);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void errorModeContinueInternalErrorMono() {
  Flux<Integer> test = Flux
      .just(0, 1)
      .hide()
      .concatMap(f ->  Mono.just(f).map(i -> 1/i))
      .onErrorContinue(OnNextFailureStrategyTest::drop);
  StepVerifier.create(test)
      .expectNoFusionSupport()
      .expectNext(1)
      .expectComplete()
      .verifyThenAssertThat()
      .hasDropped(0)
      .hasDroppedErrors(1);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void overrideInheritedErrorStrategyInFlatMap() {
  Flux<Integer> test = Flux.just(1, 2, 3)
      .flatMap(i -> Flux.range(0, i + 1)
          .map(v -> 30 / v)
          .onErrorReturn(100)
          .onErrorStop())
      .onErrorContinue(OnNextFailureStrategyTest::drop);
  StepVerifier.create(test)
      .expectNext(100, 100, 100)
      .expectComplete()
      .verify();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void fluxApiWithinFlatMap() {
  Flux<Integer> test = Flux.just(1, 2, 3)
               .flatMap(i -> Flux.range(0, i + 1)
                        .map(v -> 30 / v))
               .onErrorContinue(OnNextFailureStrategyTest::drop);
  StepVerifier.create(test)
        .expectNext(30, 30, 15, 30, 15, 10)
        .expectComplete()
        .verifyThenAssertThat()
        .hasDroppedExactly(0, 0, 0)
        .hasDroppedErrorsSatisfying(
            errors -> assertThat(errors)
                .hasSize(3)
                .allMatch(e -> e instanceof ArithmeticException));
}

代码示例来源:origin: reactor/reactor-core

@Test
public void errorModeContinueInternalError() {
  Flux<Integer> test = Flux
      .just(1, 2)
      .hide()
      .flatMap(f -> {
        if(f == 1){
          return Mono.error(new NullPointerException());
        }
        else {
          return Mono.just(f);
        }
      })
      .onErrorContinue(OnNextFailureStrategyTest::drop);
  StepVerifier.create(test)
      .expectNoFusionSupport()
      .expectNext(2)
      .expectComplete()
      .verifyThenAssertThat()
      .hasDropped(1)
      .hasDroppedErrors(1);
}

代码示例来源:origin: reactor/reactor-core

@Test
  public void errorModeContinueInternalErrorMonoAsync() {
    Flux<Integer> test = Flux
        .just(0, 1)
        .hide()
        .flatMap(f ->  Mono.just(f).publishOn(Schedulers.parallel()).map(i -> 1/i))
        .onErrorContinue(OnNextFailureStrategyTest::drop);

    StepVerifier.create(test)
        .expectNoFusionSupport()
        .expectNext(1)
        .expectComplete()
        .verifyThenAssertThat()
        .hasDropped(0)
        .hasDroppedErrors(1);
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
public void errorModeContinueInternalErrorMonoAsync() {
  Flux<Integer> test = Flux
      .just(0, 1)
      .hide()
      .concatMap(f ->  Mono.just(f).publishOn(Schedulers.parallel()).map(i -> 1/i))
      .onErrorContinue(OnNextFailureStrategyTest::drop);
  StepVerifier.create(test)
      .expectNoFusionSupport()
      .expectNext(1)
      .expectComplete()
      .verifyThenAssertThat()
      .hasDropped(0)
      .hasDroppedErrors(1);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void errorModeContinueInternalErrorStopStrategy() {
  Flux<Integer> test = Flux
      .just(0, 1)
      .hide()
      .flatMap(f ->  Flux.range(f, 1).map(i -> 1/i).onErrorStop())
      .onErrorContinue(OnNextFailureStrategyTest::drop);
  StepVerifier.create(test)
      .expectNoFusionSupport()
      .expectNext(1)
      .expectComplete()
      .verifyThenAssertThat()
      .hasNotDroppedElements()
      .hasDroppedErrors(1);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void errorModeContinueInternalErrorStopStrategy() {
  Flux<Integer> test = Flux
      .just(0, 1)
      .hide()
      .concatMap(f ->  Flux.range(f, 1).map(i -> 1/i).onErrorStop())
      .onErrorContinue(OnNextFailureStrategyTest::drop);
  StepVerifier.create(test)
      .expectNoFusionSupport()
      .expectNext(1)
      .expectComplete()
      .verifyThenAssertThat()
      .hasNotDroppedElements()
      .hasDroppedErrors(1);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void errorModeContinueNullPublisher() {
  Flux<Integer> test = Flux
      .just(1, 2)
      .hide()
      .<Integer>concatMap(f -> null)
      .onErrorContinue(OnNextFailureStrategyTest::drop);
  StepVerifier.create(test)
      .expectNoFusionSupport()
      .expectComplete()
      .verifyThenAssertThat()
      .hasDropped(1, 2)
      .hasDroppedErrors(2);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void errorModeContinueNullPublisher() {
  Flux<Integer> test = Flux
      .just(1, 2)
      .hide()
      .<Integer>flatMap(f -> null)
      .onErrorContinue(OnNextFailureStrategyTest::drop);
  StepVerifier.create(test)
        .expectNoFusionSupport()
        .expectComplete()
        .verifyThenAssertThat()
        .hasDropped(1, 2)
        .hasDroppedErrors(2);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void errorModeContinueInternalErrorStopStrategyAsync() {
  Flux<Integer> test = Flux
      .just(0, 1)
      .hide()
      .flatMap(f ->  Flux.range(f, 1).publishOn(Schedulers.parallel()).map(i -> 1/i).onErrorStop())
      .onErrorContinue(OnNextFailureStrategyTest::drop);
  StepVerifier.create(test)
      .expectNoFusionSupport()
      .expectNext(1)
      .expectComplete()
      .verifyThenAssertThat()
      .hasNotDroppedElements()
      .hasDroppedErrors(1);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void errorModeContinueInternalErrorStopStrategyAsync() {
  Flux<Integer> test = Flux
      .just(0, 1)
      .hide()
      .concatMap(f ->  Flux.range(f, 1).publishOn(Schedulers.parallel()).map(i -> 1 / i).onErrorStop())
      .onErrorContinue(OnNextFailureStrategyTest::drop);
  StepVerifier.create(test)
      .expectNoFusionSupport()
      .expectNext(1)
      .expectComplete()
      .verifyThenAssertThat()
      .hasNotDroppedElements()
      .hasDroppedErrors(1);
}

相关文章

微信公众号

最新文章

更多

Flux类方法