本文整理了Java中reactor.core.publisher.Flux.onErrorMap()
方法的一些代码示例,展示了Flux.onErrorMap()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.onErrorMap()
方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:onErrorMap
[英]Transform an error emitted by this Flux by synchronously applying a function to it if the error matches the given type. Otherwise let the error pass through.
[中]如果错误与给定类型匹配,则通过同步应用函数来转换此流量发出的错误。否则,让错误通过。
代码示例来源:origin: reactor/reactor-core
/**
* Transform an error emitted by this {@link Flux} by synchronously applying a function
* to it if the error matches the given type. Otherwise let the error pass through.
* <p>
* <img class="marble" src="doc-files/marbles/onErrorMapWithClassPredicateForFlux.svg" alt="">
*
* @param type the class of the exception type to react to
* @param mapper the error transforming {@link Function}
* @param <E> the error type
*
* @return a {@link Flux} that transforms some source errors to other errors
*/
public final <E extends Throwable> Flux<T> onErrorMap(Class<E> type,
Function<? super E, ? extends Throwable> mapper) {
@SuppressWarnings("unchecked")
Function<Throwable, Throwable> handler = (Function<Throwable, Throwable>)mapper;
return onErrorMap(type::isInstance, handler);
}
代码示例来源:origin: spring-projects/spring-framework
@Override
public <T> Flux<T> bodyToFlux(Class<? extends T> elementClass) {
Flux<T> flux = body(BodyExtractors.toFlux(elementClass));
return flux.onErrorMap(UnsupportedMediaTypeException.class, ERROR_MAPPER)
.onErrorMap(DecodingException.class, DECODING_MAPPER);
}
代码示例来源:origin: spring-projects/spring-framework
@Override
public <T> Flux<T> bodyToFlux(ParameterizedTypeReference<T> typeReference) {
Flux<T> flux = body(BodyExtractors.toFlux(typeReference));
return flux.onErrorMap(UnsupportedMediaTypeException.class, ERROR_MAPPER)
.onErrorMap(DecodingException.class, DECODING_MAPPER);
}
代码示例来源:origin: spring-projects/spring-framework
public Mono<byte[]> getContent() {
return Mono.defer(() -> {
if (this.content.isTerminated()) {
return this.content;
}
if (!this.hasContentConsumer) {
// Couple of possible cases:
// 1. Mock server never consumed request body (e.g. error before read)
// 2. FluxExchangeResult: getResponseBodyContent called before getResponseBody
//noinspection ConstantConditions
(this.publisher != null ? this.publisher : this.publisherNested)
.onErrorMap(ex -> new IllegalStateException(
"Content has not been consumed, and " +
"an error was raised while attempting to produce it.", ex))
.subscribe();
}
return this.content;
});
}
代码示例来源:origin: spring-cloud/spring-cloud-gateway
@Override
public <T> Flux<T> bodyToFlux(Class<? extends T> elementClass) {
Flux<T> flux = body(BodyExtractors.toFlux(elementClass));
return flux.onErrorMap(UnsupportedMediaTypeException.class, ERROR_MAPPER);
}
代码示例来源:origin: spring-cloud/spring-cloud-gateway
@Override
public <T> Flux<T> bodyToFlux(ParameterizedTypeReference<T> typeReference) {
Flux<T> flux = body(BodyExtractors.toFlux(typeReference));
return flux.onErrorMap(UnsupportedMediaTypeException.class, ERROR_MAPPER);
}
代码示例来源:origin: spring-projects/spring-data-redis
/**
* @param callback
* @return
*/
public <T> Flux<T> execute(LettuceReactiveCallback<T> callback) {
return getCommands().flatMapMany(callback::doWithCommands).onErrorMap(translateException());
}
代码示例来源:origin: spring-projects/spring-data-redis
/**
* @param node must not be {@literal null}.
* @param callback must not be {@literal null}.
* @throws IllegalArgumentException when {@code node} or {@code callback} is {@literal null}.
* @return {@link Flux} emitting execution results.
*/
public <T> Flux<T> execute(RedisNode node, LettuceReactiveCallback<T> callback) {
try {
Assert.notNull(node, "RedisClusterNode must not be null!");
Assert.notNull(callback, "ReactiveCallback must not be null!");
} catch (IllegalArgumentException e) {
return Flux.error(e);
}
return getCommands(node).flatMapMany(callback::doWithCommands).onErrorMap(translateException());
}
代码示例来源:origin: spring-projects/spring-data-elasticsearch
@Override
public <T> Publisher<T> execute(ClientCallback<Publisher<T>> callback) {
return Flux.defer(() -> callback.doWithClient(getClient())).onErrorMap(this::translateException);
}
代码示例来源:origin: spring-projects/spring-data-redis
/**
* @param callback
* @return
* @since 2.0.1
*/
public <T> Flux<T> executeDedicated(LettuceReactiveCallback<T> callback) {
return getDedicatedCommands().flatMapMany(callback::doWithCommands).onErrorMap(translateException());
}
代码示例来源:origin: spring-projects/spring-data-redis
ConnectableFlux<T> connectableFlux = connectFunction.get().onErrorMap(exceptionTranslator).publish();
Flux<T> fluxToUse = connectableFlux.doOnSubscribe(subscription -> {
代码示例来源:origin: spring-projects/spring-data-mongodb
/**
* Create a reusable {@link Flux} for the {@code collectionName} and {@link ReactiveCollectionCallback}.
*
* @param collectionName must not be empty or {@literal null}.
* @param callback must not be {@literal null}.
* @return a reusable {@link Flux} wrapping the {@link ReactiveCollectionCallback}.
*/
public <T> Flux<T> createFlux(String collectionName, ReactiveCollectionCallback<T> callback) {
Assert.hasText(collectionName, "Collection name must not be null or empty!");
Assert.notNull(callback, "ReactiveDatabaseCallback must not be null!");
Mono<MongoCollection<Document>> collectionPublisher = Mono
.fromCallable(() -> getAndPrepareCollection(doGetDatabase(), collectionName));
return collectionPublisher.flatMapMany(callback::doInCollection).onErrorMap(translateException());
}
代码示例来源:origin: spring-projects/spring-data-mongodb
/**
* Create a reusable Flux for a {@link ReactiveDatabaseCallback}. It's up to the developer to choose to obtain a new
* {@link Flux} or to reuse the {@link Flux}.
*
* @param callback must not be {@literal null}
* @return a {@link Flux} wrapping the {@link ReactiveDatabaseCallback}.
*/
public <T> Flux<T> createFlux(ReactiveDatabaseCallback<T> callback) {
Assert.notNull(callback, "ReactiveDatabaseCallback must not be null!");
return Flux.defer(() -> callback.doInDB(prepareDatabase(doGetDatabase()))).onErrorMap(translateException());
}
代码示例来源:origin: reactor/reactor-core
@Test
public void errorHandlingRethrow2() {
Flux<String> flux =
Flux.just("timeout1")
.flatMap(k -> callExternalService(k)
.onErrorMap(original -> new BusinessException("oops, SLA exceeded", original))
);
StepVerifier.create(flux)
.verifyErrorMatches(e -> e instanceof BusinessException &&
e.getMessage().equals("oops, SLA exceeded") &&
e.getCause() instanceof TimeoutException);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void mapError() {
StepVerifier.create(Flux.<Integer>error(new TestException())
.onErrorMap(TestException.class, e -> new Exception("test")))
.verifyErrorMessage("test");
}
代码示例来源:origin: reactor/reactor-core
@Test
public void errorMap() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Flux.<Integer>error(new Exception()).onErrorMap(d -> new RuntimeException("forced" + " " + "failure"))
.subscribe(ts);
ts.assertNoValues()
.assertError()
.assertErrorMessage("forced failure")
.assertNotComplete();
}
代码示例来源:origin: spring-cloud/spring-cloud-gateway
responseFlux = responseFlux.timeout(properties.getResponseTimeout(),
Mono.error(new TimeoutException("Response took longer than timeout: " +
properties.getResponseTimeout()))).onErrorMap(TimeoutException.class, th -> new ResponseStatusException(HttpStatus.GATEWAY_TIMEOUT, null, th));
代码示例来源:origin: org.springframework.data/spring-data-mongodb
/**
* Create a reusable {@link Flux} for the {@code collectionName} and {@link ReactiveCollectionCallback}.
*
* @param collectionName must not be empty or {@literal null}.
* @param callback must not be {@literal null}.
* @return a reusable {@link Flux} wrapping the {@link ReactiveCollectionCallback}.
*/
public <T> Flux<T> createFlux(String collectionName, ReactiveCollectionCallback<T> callback) {
Assert.hasText(collectionName, "Collection name must not be null or empty!");
Assert.notNull(callback, "ReactiveDatabaseCallback must not be null!");
Mono<MongoCollection<Document>> collectionPublisher = Mono
.fromCallable(() -> getAndPrepareCollection(doGetDatabase(), collectionName));
return collectionPublisher.flatMapMany(callback::doInCollection).onErrorMap(translateException());
}
代码示例来源:origin: org.springframework.data/spring-data-mongodb
/**
* Create a reusable Flux for a {@link ReactiveDatabaseCallback}. It's up to the developer to choose to obtain a new
* {@link Flux} or to reuse the {@link Flux}.
*
* @param callback must not be {@literal null}
* @return a {@link Flux} wrapping the {@link ReactiveDatabaseCallback}.
*/
public <T> Flux<T> createFlux(ReactiveDatabaseCallback<T> callback) {
Assert.notNull(callback, "ReactiveDatabaseCallback must not be null!");
return Flux.defer(() -> callback.doInDB(prepareDatabase(doGetDatabase()))).onErrorMap(translateException());
}
代码示例来源:origin: org.springframework.data/spring-data-redis
/**
* @param callback
* @return
*/
public <T> Flux<T> execute(LettuceReactiveCallback<T> callback) {
return getCommands().flatMapMany(callback::doWithCommands).onErrorMap(translateException());
}
内容来源于网络,如有侵权,请联系作者删除!