reactor.core.publisher.Flux.onErrorMap()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(9.1k)|赞(0)|评价(0)|浏览(124)

本文整理了Java中reactor.core.publisher.Flux.onErrorMap()方法的一些代码示例,展示了Flux.onErrorMap()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.onErrorMap()方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:onErrorMap

Flux.onErrorMap介绍

[英]Transform an error emitted by this Flux by synchronously applying a function to it if the error matches the given type. Otherwise let the error pass through.
[中]如果错误与给定类型匹配,则通过同步应用函数来转换此流量发出的错误。否则,让错误通过。

代码示例

代码示例来源:origin: reactor/reactor-core

/**
 * Transform an error emitted by this {@link Flux} by synchronously applying a function
 * to it if the error matches the given type. Otherwise let the error pass through.
 * <p>
 * <img class="marble" src="doc-files/marbles/onErrorMapWithClassPredicateForFlux.svg" alt="">
 *
 * @param type the class of the exception type to react to
 * @param mapper the error transforming {@link Function}
 * @param <E> the error type
 *
 * @return a {@link Flux} that transforms some source errors to other errors
 */
public final <E extends Throwable> Flux<T> onErrorMap(Class<E> type,
    Function<? super E, ? extends Throwable> mapper) {
  @SuppressWarnings("unchecked")
  Function<Throwable, Throwable> handler = (Function<Throwable, Throwable>)mapper;
  return onErrorMap(type::isInstance, handler);
}

代码示例来源:origin: spring-projects/spring-framework

@Override
public <T> Flux<T> bodyToFlux(Class<? extends T> elementClass) {
  Flux<T> flux = body(BodyExtractors.toFlux(elementClass));
  return flux.onErrorMap(UnsupportedMediaTypeException.class, ERROR_MAPPER)
      .onErrorMap(DecodingException.class, DECODING_MAPPER);
}

代码示例来源:origin: spring-projects/spring-framework

@Override
public <T> Flux<T> bodyToFlux(ParameterizedTypeReference<T> typeReference) {
  Flux<T> flux = body(BodyExtractors.toFlux(typeReference));
  return flux.onErrorMap(UnsupportedMediaTypeException.class, ERROR_MAPPER)
      .onErrorMap(DecodingException.class, DECODING_MAPPER);
}

代码示例来源:origin: spring-projects/spring-framework

public Mono<byte[]> getContent() {
  return Mono.defer(() -> {
    if (this.content.isTerminated()) {
      return this.content;
    }
    if (!this.hasContentConsumer) {
      // Couple of possible cases:
      //  1. Mock server never consumed request body (e.g. error before read)
      //  2. FluxExchangeResult: getResponseBodyContent called before getResponseBody
      //noinspection ConstantConditions
      (this.publisher != null ? this.publisher : this.publisherNested)
          .onErrorMap(ex -> new IllegalStateException(
              "Content has not been consumed, and " +
                  "an error was raised while attempting to produce it.", ex))
          .subscribe();
    }
    return this.content;
  });
}

代码示例来源:origin: spring-cloud/spring-cloud-gateway

@Override
public <T> Flux<T> bodyToFlux(Class<? extends T> elementClass) {
  Flux<T> flux = body(BodyExtractors.toFlux(elementClass));
  return flux.onErrorMap(UnsupportedMediaTypeException.class, ERROR_MAPPER);
}

代码示例来源:origin: spring-cloud/spring-cloud-gateway

@Override
public <T> Flux<T> bodyToFlux(ParameterizedTypeReference<T> typeReference) {
  Flux<T> flux = body(BodyExtractors.toFlux(typeReference));
  return flux.onErrorMap(UnsupportedMediaTypeException.class, ERROR_MAPPER);
}

代码示例来源:origin: spring-projects/spring-data-redis

/**
 * @param callback
 * @return
 */
public <T> Flux<T> execute(LettuceReactiveCallback<T> callback) {
  return getCommands().flatMapMany(callback::doWithCommands).onErrorMap(translateException());
}

代码示例来源:origin: spring-projects/spring-data-redis

/**
 * @param node must not be {@literal null}.
 * @param callback must not be {@literal null}.
 * @throws IllegalArgumentException when {@code node} or {@code callback} is {@literal null}.
 * @return {@link Flux} emitting execution results.
 */
public <T> Flux<T> execute(RedisNode node, LettuceReactiveCallback<T> callback) {
  try {
    Assert.notNull(node, "RedisClusterNode must not be null!");
    Assert.notNull(callback, "ReactiveCallback must not be null!");
  } catch (IllegalArgumentException e) {
    return Flux.error(e);
  }
  return getCommands(node).flatMapMany(callback::doWithCommands).onErrorMap(translateException());
}

代码示例来源:origin: spring-projects/spring-data-elasticsearch

@Override
public <T> Publisher<T> execute(ClientCallback<Publisher<T>> callback) {
  return Flux.defer(() -> callback.doWithClient(getClient())).onErrorMap(this::translateException);
}

代码示例来源:origin: spring-projects/spring-data-redis

/**
 * @param callback
 * @return
 * @since 2.0.1
 */
public <T> Flux<T> executeDedicated(LettuceReactiveCallback<T> callback) {
  return getDedicatedCommands().flatMapMany(callback::doWithCommands).onErrorMap(translateException());
}

代码示例来源:origin: spring-projects/spring-data-redis

ConnectableFlux<T> connectableFlux = connectFunction.get().onErrorMap(exceptionTranslator).publish();
Flux<T> fluxToUse = connectableFlux.doOnSubscribe(subscription -> {

代码示例来源:origin: spring-projects/spring-data-mongodb

/**
 * Create a reusable {@link Flux} for the {@code collectionName} and {@link ReactiveCollectionCallback}.
 *
 * @param collectionName must not be empty or {@literal null}.
 * @param callback must not be {@literal null}.
 * @return a reusable {@link Flux} wrapping the {@link ReactiveCollectionCallback}.
 */
public <T> Flux<T> createFlux(String collectionName, ReactiveCollectionCallback<T> callback) {
  Assert.hasText(collectionName, "Collection name must not be null or empty!");
  Assert.notNull(callback, "ReactiveDatabaseCallback must not be null!");
  Mono<MongoCollection<Document>> collectionPublisher = Mono
      .fromCallable(() -> getAndPrepareCollection(doGetDatabase(), collectionName));
  return collectionPublisher.flatMapMany(callback::doInCollection).onErrorMap(translateException());
}

代码示例来源:origin: spring-projects/spring-data-mongodb

/**
 * Create a reusable Flux for a {@link ReactiveDatabaseCallback}. It's up to the developer to choose to obtain a new
 * {@link Flux} or to reuse the {@link Flux}.
 *
 * @param callback must not be {@literal null}
 * @return a {@link Flux} wrapping the {@link ReactiveDatabaseCallback}.
 */
public <T> Flux<T> createFlux(ReactiveDatabaseCallback<T> callback) {
  Assert.notNull(callback, "ReactiveDatabaseCallback must not be null!");
  return Flux.defer(() -> callback.doInDB(prepareDatabase(doGetDatabase()))).onErrorMap(translateException());
}

代码示例来源:origin: reactor/reactor-core

@Test
public void errorHandlingRethrow2() {
  Flux<String> flux =
  Flux.just("timeout1")
    .flatMap(k -> callExternalService(k)
        .onErrorMap(original -> new BusinessException("oops, SLA exceeded", original))
    );
  StepVerifier.create(flux)
        .verifyErrorMatches(e -> e instanceof BusinessException &&
            e.getMessage().equals("oops, SLA exceeded") &&
            e.getCause() instanceof TimeoutException);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void mapError() {
  StepVerifier.create(Flux.<Integer>error(new TestException())
      .onErrorMap(TestException.class, e -> new Exception("test")))
      .verifyErrorMessage("test");
}

代码示例来源:origin: reactor/reactor-core

@Test
public void errorMap() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Flux.<Integer>error(new Exception()).onErrorMap(d -> new RuntimeException("forced" + " " + "failure"))
                    .subscribe(ts);
  ts.assertNoValues()
   .assertError()
   .assertErrorMessage("forced failure")
   .assertNotComplete();
}

代码示例来源:origin: spring-cloud/spring-cloud-gateway

responseFlux = responseFlux.timeout(properties.getResponseTimeout(),
    Mono.error(new TimeoutException("Response took longer than timeout: " +
        properties.getResponseTimeout()))).onErrorMap(TimeoutException.class, th -> new ResponseStatusException(HttpStatus.GATEWAY_TIMEOUT, null, th));

代码示例来源:origin: org.springframework.data/spring-data-mongodb

/**
 * Create a reusable {@link Flux} for the {@code collectionName} and {@link ReactiveCollectionCallback}.
 *
 * @param collectionName must not be empty or {@literal null}.
 * @param callback must not be {@literal null}.
 * @return a reusable {@link Flux} wrapping the {@link ReactiveCollectionCallback}.
 */
public <T> Flux<T> createFlux(String collectionName, ReactiveCollectionCallback<T> callback) {
  Assert.hasText(collectionName, "Collection name must not be null or empty!");
  Assert.notNull(callback, "ReactiveDatabaseCallback must not be null!");
  Mono<MongoCollection<Document>> collectionPublisher = Mono
      .fromCallable(() -> getAndPrepareCollection(doGetDatabase(), collectionName));
  return collectionPublisher.flatMapMany(callback::doInCollection).onErrorMap(translateException());
}

代码示例来源:origin: org.springframework.data/spring-data-mongodb

/**
 * Create a reusable Flux for a {@link ReactiveDatabaseCallback}. It's up to the developer to choose to obtain a new
 * {@link Flux} or to reuse the {@link Flux}.
 *
 * @param callback must not be {@literal null}
 * @return a {@link Flux} wrapping the {@link ReactiveDatabaseCallback}.
 */
public <T> Flux<T> createFlux(ReactiveDatabaseCallback<T> callback) {
  Assert.notNull(callback, "ReactiveDatabaseCallback must not be null!");
  return Flux.defer(() -> callback.doInDB(prepareDatabase(doGetDatabase()))).onErrorMap(translateException());
}

代码示例来源:origin: org.springframework.data/spring-data-redis

/**
 * @param callback
 * @return
 */
public <T> Flux<T> execute(LettuceReactiveCallback<T> callback) {
  return getCommands().flatMapMany(callback::doWithCommands).onErrorMap(translateException());
}

相关文章

微信公众号

最新文章

更多

Flux类方法