reactor.core.publisher.Mono.doFinally()方法的使用及代码示例

x33g5p2x  于2022-01-24 转载在 其他  
字(8.2k)|赞(0)|评价(0)|浏览(1098)

本文整理了Java中reactor.core.publisher.Mono.doFinally()方法的一些代码示例,展示了Mono.doFinally()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Mono.doFinally()方法的具体详情如下:
包路径:reactor.core.publisher.Mono
类名称:Mono
方法名:doFinally

Mono.doFinally介绍

[英]Add behavior triggering after the Mono terminates for any reason, including cancellation. The terminating event ( SignalType#ON_COMPLETE, SignalType#ON_ERROR and SignalType#CANCEL) is passed to the consumer, which is executed after the signal has been passed downstream.

Note that the fact that the signal is propagated downstream before the callback is executed means that several doFinally in a row will be executed in reverse order. If you want to assert the execution of the callback please keep in mind that the Mono will complete before it is executed, so its effect might not be visible immediately after eg. a #block().
[中]在Mono因任何原因(包括取消)终止后添加行为触发。终止事件(SignalType#ON#u COMPLETE、SignalType#ON#u ERROR和SignalType#CANCEL)被传递给消费者,在信号被传递到下游后执行。
请注意,在执行回调之前,信号向下游传播这一事实意味着一行中的几个doFinally将以相反的顺序执行。如果您想断言回调的执行,请记住Mono将在执行之前完成,因此它的效果可能不会在执行后立即可见,例如a#block()。

代码示例

代码示例来源:origin: codecentric/spring-boot-admin

protected Mono<Void> updateStatus(InstanceId instanceId) {
  return statusUpdater.updateStatus(instanceId).doFinally(s -> lastQueried.put(instanceId, Instant.now()));
}

代码示例来源:origin: reactor/reactor-core

@Test(expected = NullPointerException.class)
public void nullCallback() {
  Mono.just(1).doFinally(null);
}

代码示例来源:origin: lettuce-io/lettuce-core

private Mono<RedisNodeDescription> getNodeDescription(List<StatefulRedisConnection<String, String>> connections,
    RedisURI uri) {
  return Mono.fromCompletionStage(redisClient.connectAsync(StringCodec.UTF8, uri)) //
      .onErrorResume(t -> {
        logger.warn("Cannot connect to {}", uri, t);
        return Mono.empty();
      }) //
      .doOnNext(connections::add) //
      .flatMap(connection -> {
        Mono<RedisNodeDescription> instance = getNodeDescription(uri, connection);
        return instance.flatMap(it -> ResumeAfter.close(connection).thenEmit(it)).doFinally(s -> {
          connections.remove(connection);
        });
      });
}

代码示例来源:origin: spring-projects/spring-framework

@Test
public void sessionClosing() throws Exception {
  this.client.execute(getUrl("/close"),
      session -> {
        logger.debug("Starting..");
        return session.receive()
            .doOnNext(s -> logger.debug("inbound " + s))
            .then()
            .doFinally(signalType -> {
              logger.debug("Completed with: " + signalType);
            });
      })
      .block(TIMEOUT);
}

代码示例来源:origin: lettuce-io/lettuce-core

public <T> Mono<T> thenEmit(T value) {
  return Mono.defer(() -> {
    if (firstCloseLatch()) {
      return Mono.fromCompletionStage(closeable.closeAsync());
    }
    return Mono.empty();
  }).then(Mono.just(value)).doFinally(s -> {
    if (firstCloseLatch()) {
      closeable.closeAsync();
    }
  });
}

代码示例来源:origin: reactor/reactor-core

@Test
public void disposeCancelsBoth() {
  AtomicReference<SignalType> s1 = new AtomicReference<>();
  AtomicReference<SignalType> s2 = new AtomicReference<>();
  StepVerifier.create(new MonoTakeUntilOther<>(Mono.never().doFinally(s1::set),
      Mono.never().doFinally(s2::set)))
        .expectSubscription()
        .expectNoEvent(Duration.ofMillis(100))
        .thenCancel()
        .verify(Duration.ofMillis(500));
  assertThat(s1.get()).isEqualTo(SignalType.CANCEL);
  assertThat(s2.get()).isEqualTo(SignalType.CANCEL);
}

代码示例来源:origin: lettuce-io/lettuce-core

public <T> Mono<T> thenError(Throwable t) {
  return Mono.defer(() -> {
    if (firstCloseLatch()) {
      return Mono.fromCompletionStage(closeable.closeAsync());
    }
    return Mono.empty();
  }).then(Mono.<T> error(t)).doFinally(s -> {
    if (firstCloseLatch()) {
      closeable.closeAsync();
    }
  });
}

代码示例来源:origin: reactor/reactor-core

@Test
//see https://github.com/reactor/reactor-core/issues/951
public void gh951_withoutDoOnError() {
  List<String> events = new ArrayList<>();
  Assertions.assertThatExceptionOfType(UnsupportedOperationException.class)
       .isThrownBy(Mono.just(true)
               .map(this::throwError)
               .doFinally(any -> events.add("doFinally " + any.toString()))
               ::subscribe)
       .withMessage("java.lang.IllegalStateException: boom");
  Assertions.assertThat(events)
       .as("withoutDoOnError")
       .containsExactly("doFinally onError");
}

代码示例来源:origin: reactor/reactor-core

@Test
public void blockOnNeverResourceCanBeCancelled() throws InterruptedException {
  CountDownLatch latch = new CountDownLatch(1);
  Disposable disposable = Mono.usingWhen(Mono.<String>never(),
      Mono::just,
      Flux::just,
      Flux::just,
      Flux::just)
                .doFinally(f -> latch.countDown())
                .subscribe();
  assertThat(latch.await(500, TimeUnit.MILLISECONDS))
      .as("hangs before dispose").isFalse();
  disposable.dispose();
  assertThat(latch.await(100, TimeUnit.MILLISECONDS))
      .as("terminates after dispose").isTrue();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void neverSourceIsCancelled() {
  AtomicReference<SignalType> signal = new AtomicReference<>();
  StepVerifier.withVirtualTime(() ->
      new MonoTakeUntilOther<>(Mono.never().doFinally(signal::set), Mono.delay(Duration.ofMillis(100)))
  )
        .expectSubscription()
        .expectNoEvent(Duration.ofMillis(100))
        .verifyComplete();
  assertThat(signal.get()).isEqualTo(SignalType.CANCEL);
}

代码示例来源:origin: spring-cloud/spring-cloud-gateway

@Test
public void sessionClosing() throws Exception {
  this.client.execute(getUrl("/close"),
      session -> {
        logger.debug("Starting..");
        return session.receive()
            .doOnNext(s -> logger.debug("inbound " + s))
            .then()
            .doFinally(signalType -> {
              logger.debug("Completed with: " + signalType);
            });
      })
      .block(Duration.ofMillis(5000));
}

代码示例来源:origin: reactor/reactor-core

@Test
public void callbackThrows() {
  try {
    StepVerifier.create(Mono.just(1)
                .doFinally(signal -> {
                  throw new IllegalStateException();
                }))
          .expectNext(1)
          .expectComplete()
          .verify();
  }
  catch (Throwable e) {
    Throwable _e = Exceptions.unwrap(e);
    assertNotSame(e, _e);
    assertThat(_e, is(instanceOf(IllegalStateException.class)));
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
public void normalCancel() {
  AtomicBoolean cancelCheck = new AtomicBoolean(false);
  StepVerifier.create(Mono.just(1).hide()
              .doOnCancel(() -> cancelCheck.set(true))
              .doFinally(this))
        .expectNoFusionSupport()
        .expectNext(1)
        .thenCancel()
        .verify();
  
  assertEquals("expected doFinally to be invoked exactly once", 1, calls);
  assertEquals(SignalType.CANCEL, signalType);
  assertTrue("expected tested mono to be cancelled", cancelCheck.get());
}

代码示例来源:origin: reactor/reactor-core

@Test
public void cancelFutureImmediatelyCancelledLoop() {
  for (int i = 0; i < 10000; i++) {
    CompletableFuture<Integer> future = new CompletableFuture<>();
    Mono<Integer> mono = Mono
        .fromFuture(future)
        .doFinally(sig -> {
          if (sig == SignalType.CANCEL) future.cancel(false);
        });
    StepVerifier.create(mono)
          .expectSubscription()
          .thenCancel()
          .verifyThenAssertThat()
          .hasNotDroppedErrors();
    assertThat(future).isCancelled();
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
public void normalEmpty() {
  StepVerifier.create(Mono.empty().doFinally(this))
        .expectNoFusionSupport()
        .expectComplete()
        .verify();
  assertEquals(1, calls);
  assertEquals(SignalType.ON_COMPLETE, signalType);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void normalError() {
  StepVerifier.create(Mono.error(new IllegalArgumentException()).doFinally(this))
        .expectNoFusionSupport()
        .expectError(IllegalArgumentException.class)
        .verify();
  assertEquals(1, calls);
  assertEquals(SignalType.ON_ERROR, signalType);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void syncFused() {
  StepVerifier.create(Mono.just(1).doFinally(this))
        .expectFusion(SYNC)
        .expectNext(1)
        .expectComplete()
        .verify();
  assertEquals(1, calls); assertEquals(SignalType.ON_COMPLETE, signalType);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void syncFusedConditional() {
  StepVerifier.create(Mono.just(1).doFinally(this).filter(i -> true))
        .expectFusion(SYNC)
        .expectNext(1)
        .expectComplete()
        .verify();
  assertEquals(1, calls); assertEquals(SignalType.ON_COMPLETE, signalType);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void normalJust() {
  StepVerifier.create(Mono.just(1).hide().doFinally(this))
        .expectNoFusionSupport()
        .expectNext(1)
        .expectComplete()
        .verify();
  assertEquals(1, calls);
  assertEquals(SignalType.ON_COMPLETE, signalType);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void normalJustConditional() {
  StepVerifier.create(Mono.just(1)
              .hide()
              .doFinally(this)
              .filter(i -> true))
        .expectNoFusionSupport()
        .expectNext(1)
        .expectComplete()
        .verify();
  assertEquals(1, calls);
  assertEquals(SignalType.ON_COMPLETE, signalType);
}

相关文章

微信公众号

最新文章

更多

Mono类方法