
x33g5p2x  于2022-01-19 转载在 其他  



[英]Transform an error emitted by this Flux by synchronously applying a function to it if the error matches the given type. Otherwise let the error pass through.


代码示例来源:origin: reactor/reactor-core

 * Transform an error emitted by this {@link Flux} by synchronously applying a function
 * to it if the error matches the given type. Otherwise let the error pass through.
 * <p>
 * <img class="marble" src="doc-files/marbles/onErrorMapWithClassPredicateForFlux.svg" alt="">
 * @param type the class of the exception type to react to
 * @param mapper the error transforming {@link Function}
 * @param <E> the error type
 * @return a {@link Flux} that transforms some source errors to other errors
public final <E extends Throwable> Flux<T> onErrorMap(Class<E> type,
    Function<? super E, ? extends Throwable> mapper) {
  Function<Throwable, Throwable> handler = (Function<Throwable, Throwable>)mapper;
  return onErrorMap(type::isInstance, handler);

代码示例来源:origin: spring-projects/spring-framework

public <T> Flux<T> bodyToFlux(Class<? extends T> elementClass) {
  Flux<T> flux = body(BodyExtractors.toFlux(elementClass));
  return flux.onErrorMap(UnsupportedMediaTypeException.class, ERROR_MAPPER)
      .onErrorMap(DecodingException.class, DECODING_MAPPER);

代码示例来源:origin: spring-projects/spring-framework

public <T> Flux<T> bodyToFlux(ParameterizedTypeReference<T> typeReference) {
  Flux<T> flux = body(BodyExtractors.toFlux(typeReference));
  return flux.onErrorMap(UnsupportedMediaTypeException.class, ERROR_MAPPER)
      .onErrorMap(DecodingException.class, DECODING_MAPPER);

代码示例来源:origin: spring-projects/spring-framework

public Mono<byte[]> getContent() {
  return Mono.defer(() -> {
    if (this.content.isTerminated()) {
      return this.content;
    if (!this.hasContentConsumer) {
      // Couple of possible cases:
      //  1. Mock server never consumed request body (e.g. error before read)
      //  2. FluxExchangeResult: getResponseBodyContent called before getResponseBody
      //noinspection ConstantConditions
      (this.publisher != null ? this.publisher : this.publisherNested)
          .onErrorMap(ex -> new IllegalStateException(
              "Content has not been consumed, and " +
                  "an error was raised while attempting to produce it.", ex))
    return this.content;

代码示例来源:origin: spring-cloud/spring-cloud-gateway

public <T> Flux<T> bodyToFlux(Class<? extends T> elementClass) {
  Flux<T> flux = body(BodyExtractors.toFlux(elementClass));
  return flux.onErrorMap(UnsupportedMediaTypeException.class, ERROR_MAPPER);

代码示例来源:origin: spring-cloud/spring-cloud-gateway

public <T> Flux<T> bodyToFlux(ParameterizedTypeReference<T> typeReference) {
  Flux<T> flux = body(BodyExtractors.toFlux(typeReference));
  return flux.onErrorMap(UnsupportedMediaTypeException.class, ERROR_MAPPER);

代码示例来源:origin: spring-projects/spring-data-redis

 * @param callback
 * @return
public <T> Flux<T> execute(LettuceReactiveCallback<T> callback) {
  return getCommands().flatMapMany(callback::doWithCommands).onErrorMap(translateException());

代码示例来源:origin: spring-projects/spring-data-redis

 * @param node must not be {@literal null}.
 * @param callback must not be {@literal null}.
 * @throws IllegalArgumentException when {@code node} or {@code callback} is {@literal null}.
 * @return {@link Flux} emitting execution results.
public <T> Flux<T> execute(RedisNode node, LettuceReactiveCallback<T> callback) {
  try {
    Assert.notNull(node, "RedisClusterNode must not be null!");
    Assert.notNull(callback, "ReactiveCallback must not be null!");
  } catch (IllegalArgumentException e) {
    return Flux.error(e);
  return getCommands(node).flatMapMany(callback::doWithCommands).onErrorMap(translateException());

代码示例来源:origin: spring-projects/spring-data-elasticsearch

public <T> Publisher<T> execute(ClientCallback<Publisher<T>> callback) {
  return Flux.defer(() -> callback.doWithClient(getClient())).onErrorMap(this::translateException);

代码示例来源:origin: spring-projects/spring-data-redis

 * @param callback
 * @return
 * @since 2.0.1
public <T> Flux<T> executeDedicated(LettuceReactiveCallback<T> callback) {
  return getDedicatedCommands().flatMapMany(callback::doWithCommands).onErrorMap(translateException());

代码示例来源:origin: spring-projects/spring-data-redis

ConnectableFlux<T> connectableFlux = connectFunction.get().onErrorMap(exceptionTranslator).publish();
Flux<T> fluxToUse = connectableFlux.doOnSubscribe(subscription -> {

代码示例来源:origin: spring-projects/spring-data-mongodb

 * Create a reusable {@link Flux} for the {@code collectionName} and {@link ReactiveCollectionCallback}.
 * @param collectionName must not be empty or {@literal null}.
 * @param callback must not be {@literal null}.
 * @return a reusable {@link Flux} wrapping the {@link ReactiveCollectionCallback}.
public <T> Flux<T> createFlux(String collectionName, ReactiveCollectionCallback<T> callback) {
  Assert.hasText(collectionName, "Collection name must not be null or empty!");
  Assert.notNull(callback, "ReactiveDatabaseCallback must not be null!");
  Mono<MongoCollection<Document>> collectionPublisher = Mono
      .fromCallable(() -> getAndPrepareCollection(doGetDatabase(), collectionName));
  return collectionPublisher.flatMapMany(callback::doInCollection).onErrorMap(translateException());

代码示例来源:origin: spring-projects/spring-data-mongodb

 * Create a reusable Flux for a {@link ReactiveDatabaseCallback}. It's up to the developer to choose to obtain a new
 * {@link Flux} or to reuse the {@link Flux}.
 * @param callback must not be {@literal null}
 * @return a {@link Flux} wrapping the {@link ReactiveDatabaseCallback}.
public <T> Flux<T> createFlux(ReactiveDatabaseCallback<T> callback) {
  Assert.notNull(callback, "ReactiveDatabaseCallback must not be null!");
  return Flux.defer(() -> callback.doInDB(prepareDatabase(doGetDatabase()))).onErrorMap(translateException());

代码示例来源:origin: reactor/reactor-core

public void errorHandlingRethrow2() {
  Flux<String> flux =
    .flatMap(k -> callExternalService(k)
        .onErrorMap(original -> new BusinessException("oops, SLA exceeded", original))
        .verifyErrorMatches(e -> e instanceof BusinessException &&
            e.getMessage().equals("oops, SLA exceeded") &&
            e.getCause() instanceof TimeoutException);

代码示例来源:origin: reactor/reactor-core

public void mapError() {
  StepVerifier.create(Flux.<Integer>error(new TestException())
      .onErrorMap(TestException.class, e -> new Exception("test")))

代码示例来源:origin: reactor/reactor-core

public void errorMap() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Flux.<Integer>error(new Exception()).onErrorMap(d -> new RuntimeException("forced" + " " + "failure"))
   .assertErrorMessage("forced failure")

代码示例来源:origin: spring-cloud/spring-cloud-gateway

responseFlux = responseFlux.timeout(properties.getResponseTimeout(),
    Mono.error(new TimeoutException("Response took longer than timeout: " +
        properties.getResponseTimeout()))).onErrorMap(TimeoutException.class, th -> new ResponseStatusException(HttpStatus.GATEWAY_TIMEOUT, null, th));


 * Create a reusable {@link Flux} for the {@code collectionName} and {@link ReactiveCollectionCallback}.
 * @param collectionName must not be empty or {@literal null}.
 * @param callback must not be {@literal null}.
 * @return a reusable {@link Flux} wrapping the {@link ReactiveCollectionCallback}.
public <T> Flux<T> createFlux(String collectionName, ReactiveCollectionCallback<T> callback) {
  Assert.hasText(collectionName, "Collection name must not be null or empty!");
  Assert.notNull(callback, "ReactiveDatabaseCallback must not be null!");
  Mono<MongoCollection<Document>> collectionPublisher = Mono
      .fromCallable(() -> getAndPrepareCollection(doGetDatabase(), collectionName));
  return collectionPublisher.flatMapMany(callback::doInCollection).onErrorMap(translateException());


 * Create a reusable Flux for a {@link ReactiveDatabaseCallback}. It's up to the developer to choose to obtain a new
 * {@link Flux} or to reuse the {@link Flux}.
 * @param callback must not be {@literal null}
 * @return a {@link Flux} wrapping the {@link ReactiveDatabaseCallback}.
public <T> Flux<T> createFlux(ReactiveDatabaseCallback<T> callback) {
  Assert.notNull(callback, "ReactiveDatabaseCallback must not be null!");
  return Flux.defer(() -> callback.doInDB(prepareDatabase(doGetDatabase()))).onErrorMap(translateException());


 * @param callback
 * @return
public <T> Flux<T> execute(LettuceReactiveCallback<T> callback) {
  return getCommands().flatMapMany(callback::doWithCommands).onErrorMap(translateException());




