reactor.core.publisher.Flux.materialize()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(5.7k)|赞(0)|评价(0)|浏览(359)

本文整理了Java中reactor.core.publisher.Flux.materialize()方法的一些代码示例,展示了Flux.materialize()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.materialize()方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:materialize

Flux.materialize介绍

[英]Transform incoming onNext, onError and onComplete signals into Signal instances, materializing these signals. Since the error is materialized as a Signal, the propagation will be stopped and onComplete will be emitted. Complete signal will first emit a Signal.complete() and then effectively complete the flux. All these Signal have a Context associated to them.
[中]将传入的onNext、onError和onComplete信号转换为信号实例,具体化这些信号。由于错误被具体化为一个信号,传播将停止并发出onComplete。完成信号将首先发出一个信号。complete()然后有效地完成通量。所有这些信号都有一个与之相关的上下文。

代码示例

代码示例来源:origin: reactor/reactor-core

@Test
public void delayErrorConcatMapVsFlatMap() {
  Function<Integer, Flux<String>> mapFunction = i -> {
    char c = (char) ('A' + i);
    return Flux.range(1, i + 1)
          .doOnNext(v -> {
            if (i == 3 && v == 3) {
              throw new IllegalStateException("boom " + c + v);
          }
          })
      .map(v -> "" + c + "" + v);
  };
  Flux<Integer> source = Flux.range(0, 5);
  Flux<String> concatMap = source.concatMapDelayError(mapFunction)
                  .materialize()
                  .map(Object::toString);
  Flux<String> flatMap = source.flatMapDelayError(mapFunction, 2, 32)
                  .materialize()
                  .map(Object::toString);
  List<String> signalsConcat = concatMap.collectList().block();
  List<String> signalsFlat = flatMap.collectList().block();
  Assertions.assertThat(signalsConcat)
       .containsExactlyElementsOf(signalsFlat);
}

代码示例来源:origin: reactor/reactor-core

.doOnError(t -> concatSuppressed.addAll(
                    Arrays.asList(t.getSuppressed())))
                .materialize()
                .map(Object::toString);
Flux<String> flatMap = source.flatMapDelayError(mapFunction, 2, 32)
               .doOnError(t -> flatSuppressed.addAll(
                   Arrays.asList(t.getSuppressed())))
               .materialize()
               .map(Object::toString);

代码示例来源:origin: reactor/reactor-core

@Test
public void materialize2() {
  StepVerifier.create(Flux.just("Three", "Two")
              .concatWith(Flux.error(new RuntimeException("test")))
              .materialize())
        .expectNextMatches(s -> s.isOnNext() && "Three".equals(s.get()))
        .expectNextMatches(s -> s.isOnNext() && "Two".equals(s.get()))
        .expectNextMatches(s -> s.isOnError() && s.getThrowable() != null
            && "test".equals(s.getThrowable().getMessage()))
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void materialize() {
  StepVerifier.create(Flux.just("Three", "Two", "One")
              .materialize())
        .expectNextMatches(s -> s.isOnNext() && "Three".equals(s.get()))
        .expectNextMatches(s -> s.isOnNext() && "Two".equals(s.get()))
        .expectNextMatches(s -> s.isOnNext() && "One".equals(s.get()))
        .expectNextMatches(Signal::isOnComplete)
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void errorOnlyBackpressured() {
  AssertSubscriber<Signal<Integer>> ts = AssertSubscriber.create(0L);
  RuntimeException ex = new RuntimeException();
  Flux.<Integer>error(ex).materialize()
              .subscribe(ts);
  ts.assertNoValues()
   .assertNoError()
   .assertNotComplete();
  ts.request(1);
  ts.assertValues(Signal.error(ex))
   .assertNoError()
   .assertComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void completeOnlyBackpressured() {
  AssertSubscriber<Signal<Integer>> ts = AssertSubscriber.create(0L);
  Flux.<Integer>empty().materialize()
             .subscribe(ts);
  ts.assertNoValues()
   .assertNoError()
   .assertNotComplete();
  ts.request(1);
  ts.assertValues(Signal.complete())
   .assertNoError()
   .assertComplete();
}

代码示例来源:origin: com.aol.cyclops/cyclops-reactor

/**
 * @return
 * @see reactor.core.publisher.Flux#materialize()
 */
public final Flux<Signal<T>> materialize() {
  return boxed.materialize();
}
/**

代码示例来源:origin: io.projectreactor.addons/reactor-extra

reader.apply(key)
 .switchIfEmpty(otherSupplier.get()
               .materialize()
               .collectList()
               .flatMap(signals -> writer.apply(key, signals)

代码示例来源:origin: org.mule.runtime/mule-core

/**
 * {@inheritDoc}
 */
@Override
public final Publisher<Void> error(Throwable throwable) {
 if (isResponseDone()) {
  if (LOGGER.isDebugEnabled()) {
   LOGGER.debug(this + " error response was already completed, ignoring.");
  }
  return empty();
 }
 if (LOGGER.isDebugEnabled()) {
  LOGGER.debug(this + " responseDone completed with error.");
 }
 if (throwable instanceof MessagingException) {
  if (LOGGER.isDebugEnabled()) {
   LOGGER.debug(this + " handling messaging exception.");
  }
  return just((MessagingException) throwable)
    .flatMapMany(exceptionHandler)
    .doOnNext(handled -> success(handled))
    .doOnError(rethrown -> responseDone(left(rethrown)))
    // This ensures that both handled and rethrown outcome both result in a Publisher<Void>
    .materialize().then()
    .toProcessor();
 } else {
  responseDone(left(throwable));
  return empty();
 }
}

代码示例来源:origin: io.projectreactor.addons/reactor-extra

if (fromCache == null) {
  return otherSupplier.get()
            .materialize()
            .collectList()
            .doOnNext(signals -> cacheMap.put(key, signals))

代码示例来源:origin: spring-cloud/spring-cloud-commons

@SuppressWarnings("unchecked")
public CachingServiceInstanceSupplier(ServiceInstanceSupplier delegate, CacheManager cacheManager) {
  this.delegate = delegate;
  this.serviceInstances = CacheFlux.lookup(key -> {
    Cache cache = cacheManager.getCache(SERVICE_INSTANCE_CACHE_NAME); //TODO: configurable cache name
    List<ServiceInstance> list = cache.get(key, List.class);
    if (list == null || list.isEmpty()) {
      return Mono.empty();
    }
    return Flux.fromIterable(list)
        .materialize()
        .collectList();
  }, delegate.getServiceId())
      .onCacheMissResume(this.delegate::get)
      .andWriteWith((key, signals) -> Flux.fromIterable(signals)
          .dematerialize()
          .cast(ServiceInstance.class)
          .collectList()
          .doOnNext(instances -> {
            Cache cache = cacheManager.getCache(SERVICE_INSTANCE_CACHE_NAME);
            cache.put(key, instances);
          })
          .then());
}

相关文章

微信公众号

最新文章

更多

Flux类方法