reactor.core.publisher.Flux.concatDelayError()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(7.3k)|赞(0)|评价(0)|浏览(198)

本文整理了Java中reactor.core.publisher.Flux.concatDelayError()方法的一些代码示例,展示了Flux.concatDelayError()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.concatDelayError()方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:concatDelayError

Flux.concatDelayError介绍

[英]Concatenate all sources emitted as an onNext signal from a parent Publisher, forwarding elements emitted by the sources downstream.

Concatenation is achieved by sequentially subscribing to the first source then waiting for it to complete before subscribing to the next, and so on until the last source completes. Errors do not interrupt the main sequence but are propagated after the rest of the sources have had a chance to be concatenated.
[中]将作为onNext信号从父发布服务器发出的所有源连接起来,转发源下游发出的元素。
连接是通过顺序订阅第一个源,然后在订阅下一个源之前等待它完成,依此类推,直到最后一个源完成。错误不会中断主序列,但会在其他源有机会连接后传播。

代码示例

代码示例来源:origin: reactor/reactor-core

/**
 * Concatenate all sources emitted as an onNext signal from a parent {@link Publisher},
 * forwarding elements emitted by the sources downstream.
 * <p>
 * Concatenation is achieved by sequentially subscribing to the first source then
 * waiting for it to complete before subscribing to the next, and so on until the
 * last source completes. Errors do not interrupt the main sequence but are propagated
 * after the rest of the sources have had a chance to be concatenated.
 * <p>
 * <img class="marble" src="doc-files/marbles/concatAsyncSources.svg" alt="">
 *
 *
 * @reactor.discard This operator discards elements it internally queued for backpressure upon cancellation.
 *
 * @param sources The {@link Publisher} of {@link Publisher} to concatenate
 * @param <T> The type of values in both source and output sequences
 *
 * @return a new {@link Flux} concatenating all inner sources sequences, delaying errors
 */
public static <T> Flux<T> concatDelayError(Publisher<? extends Publisher<? extends T>> sources) {
  return concatDelayError(sources, Queues.XS_BUFFER_SIZE);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void concatArrayDelayErrorWithFluxError() {
  StepVerifier.create(
      Flux.concatDelayError(
          Flux.just(1, 2),
          Flux.error(new Exception("test")),
          Flux.just(3, 4))
  )
        .expectNext(1, 2, 3, 4)
        .verifyErrorMessage("test");
}

代码示例来源:origin: reactor/reactor-core

@Test
public void concatArrayDelayErrorWithMonoError() {
  StepVerifier.create(
      Flux.concatDelayError(
              Flux.just(1, 2),
              Mono.error(new Exception("test")),
              Flux.just(3, 4))
  )
        .expectNext(1, 2, 3, 4)
        .verifyErrorMessage("test");
}

代码示例来源:origin: reactor/reactor-core

@Test
public void concatDelayErrorWithMonoError() {
  StepVerifier.create(
      Flux.concatDelayError(
          Flux.just(
              Flux.just(1, 2),
              Mono.error(new Exception("test")),
              Flux.just(3, 4))))
        .expectNext(1, 2, 3, 4)
        .verifyErrorMessage("test");
}

代码示例来源:origin: reactor/reactor-core

@Test
public void concatDelayErrorWithFluxError() {
  StepVerifier.create(
      Flux.concatDelayError(
          Flux.just(
              Flux.just(1, 2),
              Flux.error(new Exception("test")),
              Flux.just(3, 4))))
        .expectNext(1, 2, 3, 4)
        .verifyErrorMessage("test");
}

代码示例来源:origin: reactor/reactor-core

@Test
public void publisherOfPublisherDelayEndError() {
  StepVerifier.create(Flux.concatDelayError(Flux.just(Flux.just(1, 2)
                              .concatWith(Flux.error(new Exception(
                                  "test"))),
      Flux.just(3, 4)), false, 128))
        .expectNext(1, 2)
        .verifyErrorMessage("test");
}

代码示例来源:origin: reactor/reactor-core

@Test
public void publisherOfPublisherDelayEndError2() {
  StepVerifier.create(Flux.concatDelayError(Flux.just(Flux.just(1, 2)
                              .concatWith(Flux.error(new Exception(
                                  "test"))),
      Flux.just(3, 4)), true, 128))
        .expectNext(1, 2, 3, 4)
        .verifyErrorMessage("test");
}

代码示例来源:origin: reactor/reactor-core

@Test
public void publisherOfPublisherDelayError() {
  StepVerifier.create(Flux.concatDelayError(Flux.just(Flux.just(1, 2).concatWith(Flux.error(new Exception("test"))),
      Flux.just(3, 4))))
        .expectNext(1, 2, 3, 4)
        .verifyErrorMessage("test");
}

代码示例来源:origin: reactor/reactor-core

@Test
public void publisherOfPublisherDelay() {
  StepVerifier.create(Flux.concatDelayError(Flux.just(Flux.just(1, 2),
      Flux.just(3, 4))))
        .expectNext(1, 2, 3, 4)
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void publisherOfPublisherDelayEnd() {
  StepVerifier.create(Flux.concatDelayError(Flux.just(Flux.just(1, 2),
      Flux.just(3, 4)), false, 128))
        .expectNext(1, 2, 3, 4)
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void publisherOfPublisherDelayEnd2() {
  StepVerifier.create(Flux.concatDelayError(Flux.just(Flux.just(1, 2),
      Flux.just(3, 4)), true, 128))
        .expectNext(1, 2, 3, 4)
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void errorDelayed() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Flux.concatDelayError(
      Flux.range(1, 2),
      Flux.error(new RuntimeException("Forced failure")),
      Flux.range(3, 2))
    .subscribe(ts);
  ts.assertValues(1, 2, 3, 4)
   .assertError(RuntimeException.class)
   .assertErrorMessage("Forced failure")
   .assertNotComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void errorManyDelayed() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Flux.concatDelayError(
      Flux.range(1, 2),
      Flux.error(new RuntimeException("Forced failure")),
      Flux.range(3, 2),
      Flux.error(new RuntimeException("Forced failure")),
      Flux.empty())
    .subscribe(ts);
  ts.assertValues(1, 2, 3, 4)
   .assertError(Throwable.class)
   .assertErrorMessage("Multiple exceptions")
   .assertNotComplete();
}

代码示例来源:origin: io.projectreactor/reactor-core

/**
 * Concatenate all sources emitted as an onNext signal from a parent {@link Publisher},
 * forwarding elements emitted by the sources downstream.
 * <p>
 * Concatenation is achieved by sequentially subscribing to the first source then
 * waiting for it to complete before subscribing to the next, and so on until the
 * last source completes. Errors do not interrupt the main sequence but are propagated
 * after the rest of the sources have had a chance to be concatenated.
 * <p>
 * <img class="marble" src="doc-files/marbles/concatAsyncSources.svg" alt="">
 *
 *
 * @reactor.discard This operator discards elements it internally queued for backpressure upon cancellation.
 *
 * @param sources The {@link Publisher} of {@link Publisher} to concatenate
 * @param <T> The type of values in both source and output sequences
 *
 * @return a new {@link Flux} concatenating all inner sources sequences, delaying errors
 */
public static <T> Flux<T> concatDelayError(Publisher<? extends Publisher<? extends T>> sources) {
  return concatDelayError(sources, Queues.XS_BUFFER_SIZE);
}

代码示例来源:origin: io.scalecube/scalecube-cluster

private Mono<Void> doShutdown() {
 return Mono.defer(
   () -> {
    LOGGER.info("Cluster member {} is shutting down", localMember);
    return Flux.concatDelayError(leaveCluster(localMember), dispose(), transport.stop())
      .then()
      .doOnSuccess(avoid -> LOGGER.info("Cluster member {} has shut down", localMember));
   });
}

代码示例来源:origin: scalecube/scalecube-cluster

private Mono<Void> doShutdown() {
 return Mono.defer(
   () -> {
    LOGGER.info("Cluster member {} is shutting down", localMember);
    return Flux.concatDelayError(leaveCluster(localMember), dispose(), transport.stop())
      .then()
      .doOnSuccess(avoid -> LOGGER.info("Cluster member {} has shut down", localMember));
   });
}

代码示例来源:origin: scalecube/scalecube-cluster

private Mono<Void> doStop() {
 return Mono.defer(
   () -> {
    LOGGER.debug("Transport is shutting down on {}", address);
    // Complete incoming messages observable
    messageSink.complete();
    return Flux.concatDelayError(closeServer(), closeConnections())
      .doOnTerminate(loopResources::dispose)
      .then()
      .doOnSuccess(avoid -> LOGGER.debug("Transport has shut down on {}", address));
   });
}

相关文章

微信公众号

最新文章

更多

Flux类方法