本文整理了Java中reactor.core.publisher.Flux.materialize()
方法的一些代码示例,展示了Flux.materialize()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.materialize()
方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:materialize
[英]Transform incoming onNext, onError and onComplete signals into Signal instances, materializing these signals. Since the error is materialized as a Signal, the propagation will be stopped and onComplete will be emitted. Complete signal will first emit a Signal.complete() and then effectively complete the flux. All these Signal have a Context associated to them.
[中]将传入的onNext、onError和onComplete信号转换为信号实例,具体化这些信号。由于错误被具体化为一个信号,传播将停止并发出onComplete。完成信号将首先发出一个信号。complete()然后有效地完成通量。所有这些信号都有一个与之相关的上下文。
代码示例来源:origin: reactor/reactor-core
@Test
public void delayErrorConcatMapVsFlatMap() {
Function<Integer, Flux<String>> mapFunction = i -> {
char c = (char) ('A' + i);
return Flux.range(1, i + 1)
.doOnNext(v -> {
if (i == 3 && v == 3) {
throw new IllegalStateException("boom " + c + v);
}
})
.map(v -> "" + c + "" + v);
};
Flux<Integer> source = Flux.range(0, 5);
Flux<String> concatMap = source.concatMapDelayError(mapFunction)
.materialize()
.map(Object::toString);
Flux<String> flatMap = source.flatMapDelayError(mapFunction, 2, 32)
.materialize()
.map(Object::toString);
List<String> signalsConcat = concatMap.collectList().block();
List<String> signalsFlat = flatMap.collectList().block();
Assertions.assertThat(signalsConcat)
.containsExactlyElementsOf(signalsFlat);
}
代码示例来源:origin: reactor/reactor-core
.doOnError(t -> concatSuppressed.addAll(
Arrays.asList(t.getSuppressed())))
.materialize()
.map(Object::toString);
Flux<String> flatMap = source.flatMapDelayError(mapFunction, 2, 32)
.doOnError(t -> flatSuppressed.addAll(
Arrays.asList(t.getSuppressed())))
.materialize()
.map(Object::toString);
代码示例来源:origin: reactor/reactor-core
@Test
public void materialize2() {
StepVerifier.create(Flux.just("Three", "Two")
.concatWith(Flux.error(new RuntimeException("test")))
.materialize())
.expectNextMatches(s -> s.isOnNext() && "Three".equals(s.get()))
.expectNextMatches(s -> s.isOnNext() && "Two".equals(s.get()))
.expectNextMatches(s -> s.isOnError() && s.getThrowable() != null
&& "test".equals(s.getThrowable().getMessage()))
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void materialize() {
StepVerifier.create(Flux.just("Three", "Two", "One")
.materialize())
.expectNextMatches(s -> s.isOnNext() && "Three".equals(s.get()))
.expectNextMatches(s -> s.isOnNext() && "Two".equals(s.get()))
.expectNextMatches(s -> s.isOnNext() && "One".equals(s.get()))
.expectNextMatches(Signal::isOnComplete)
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void errorOnlyBackpressured() {
AssertSubscriber<Signal<Integer>> ts = AssertSubscriber.create(0L);
RuntimeException ex = new RuntimeException();
Flux.<Integer>error(ex).materialize()
.subscribe(ts);
ts.assertNoValues()
.assertNoError()
.assertNotComplete();
ts.request(1);
ts.assertValues(Signal.error(ex))
.assertNoError()
.assertComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void completeOnlyBackpressured() {
AssertSubscriber<Signal<Integer>> ts = AssertSubscriber.create(0L);
Flux.<Integer>empty().materialize()
.subscribe(ts);
ts.assertNoValues()
.assertNoError()
.assertNotComplete();
ts.request(1);
ts.assertValues(Signal.complete())
.assertNoError()
.assertComplete();
}
代码示例来源:origin: com.aol.cyclops/cyclops-reactor
/**
* @return
* @see reactor.core.publisher.Flux#materialize()
*/
public final Flux<Signal<T>> materialize() {
return boxed.materialize();
}
/**
代码示例来源:origin: io.projectreactor.addons/reactor-extra
reader.apply(key)
.switchIfEmpty(otherSupplier.get()
.materialize()
.collectList()
.flatMap(signals -> writer.apply(key, signals)
代码示例来源:origin: org.mule.runtime/mule-core
/**
* {@inheritDoc}
*/
@Override
public final Publisher<Void> error(Throwable throwable) {
if (isResponseDone()) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(this + " error response was already completed, ignoring.");
}
return empty();
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(this + " responseDone completed with error.");
}
if (throwable instanceof MessagingException) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(this + " handling messaging exception.");
}
return just((MessagingException) throwable)
.flatMapMany(exceptionHandler)
.doOnNext(handled -> success(handled))
.doOnError(rethrown -> responseDone(left(rethrown)))
// This ensures that both handled and rethrown outcome both result in a Publisher<Void>
.materialize().then()
.toProcessor();
} else {
responseDone(left(throwable));
return empty();
}
}
代码示例来源:origin: io.projectreactor.addons/reactor-extra
if (fromCache == null) {
return otherSupplier.get()
.materialize()
.collectList()
.doOnNext(signals -> cacheMap.put(key, signals))
代码示例来源:origin: spring-cloud/spring-cloud-commons
@SuppressWarnings("unchecked")
public CachingServiceInstanceSupplier(ServiceInstanceSupplier delegate, CacheManager cacheManager) {
this.delegate = delegate;
this.serviceInstances = CacheFlux.lookup(key -> {
Cache cache = cacheManager.getCache(SERVICE_INSTANCE_CACHE_NAME); //TODO: configurable cache name
List<ServiceInstance> list = cache.get(key, List.class);
if (list == null || list.isEmpty()) {
return Mono.empty();
}
return Flux.fromIterable(list)
.materialize()
.collectList();
}, delegate.getServiceId())
.onCacheMissResume(this.delegate::get)
.andWriteWith((key, signals) -> Flux.fromIterable(signals)
.dematerialize()
.cast(ServiceInstance.class)
.collectList()
.doOnNext(instances -> {
Cache cache = cacheManager.getCache(SERVICE_INSTANCE_CACHE_NAME);
cache.put(key, instances);
})
.then());
}
内容来源于网络,如有侵权,请联系作者删除!