reactor.core.publisher.Mono.doOnSubscribe()方法的使用及代码示例

x33g5p2x  于2022-01-24 转载在 其他  
字(10.7k)|赞(0)|评价(0)|浏览(438)

本文整理了Java中reactor.core.publisher.Mono.doOnSubscribe()方法的一些代码示例,展示了Mono.doOnSubscribe()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Mono.doOnSubscribe()方法的具体详情如下:
包路径:reactor.core.publisher.Mono
类名称:Mono
方法名:doOnSubscribe

Mono.doOnSubscribe介绍

[英]Add behavior triggered when the Mono is subscribed.

This method is not intended for capturing the subscription and calling its methods, but for side effects like monitoring. For instance, the correct way to cancel a subscription is to call Disposable#dispose() on the Disposable returned by Mono#subscribe().
[中]添加订阅Mono时触发的行为。
此方法不用于捕获订阅并调用其方法,而是用于监视等副作用。例如,取消订阅的正确方法是对Mono#subscribe()返回的一次性文件调用Disposable#dispose()。

代码示例

代码示例来源:origin: reactor/reactor-core

@Override
  protected Mono<O> doOnSubscribe(Mono<O> output,
      Consumer<? super Subscription> doOnSubscribe) {
    return output.doOnSubscribe(doOnSubscribe);
  }
}

代码示例来源:origin: reactor/reactor-core

@Override
protected Mono<O> doOnSubscribe(Mono<O> output, Consumer<? super Subscription> doOnSubscribe) {
  return output.doOnSubscribe(doOnSubscribe);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void testCommandEmptyPathIsUsedBoilerplate() {
  AtomicBoolean wasInvoked = new AtomicBoolean();
  AtomicBoolean wasRequested = new AtomicBoolean();
  Mono<Void> testFallback = Mono.<Void>empty()
      .doOnSubscribe(s -> wasInvoked.set(true))
      .doOnRequest(l -> wasRequested.set(true));
  processOrFallback(Mono.empty(), testFallback).subscribe();
  assertThat(wasInvoked.get()).isTrue();
  assertThat(wasRequested.get()).isTrue();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void monoToProcessorChainColdToHot() {
  AtomicInteger subscriptionCount = new AtomicInteger();
  Mono<String> coldToHot = Mono.just("foo")
                 .doOnSubscribe(sub -> subscriptionCount.incrementAndGet())
                 .cache()
                 .toProcessor() //this actually subscribes
                 .filter(s -> s.length() < 4);
  assertThat(subscriptionCount.get()).isEqualTo(1);
  coldToHot.block();
  coldToHot.block();
  coldToHot.block();
  assertThat(subscriptionCount.get()).isEqualTo(1);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void onMonoDoOnSubscribe() {
  Mono<String> mp = Mono.just("test");
  AtomicReference<Subscription> ref = new AtomicReference<>();
  StepVerifier.create(mp.doOnSubscribe(ref::set))
        .expectNext("test")
        .verifyComplete();
  assertThat(ref.get()).isNotNull();
}

代码示例来源:origin: reactor/reactor-core

@Override
public Mono<T> mono() {
  return Mono.from(delegate)
        .doOnSubscribe(sub -> incrementAndGet(SUBSCRIBED))
        .doOnCancel(() -> incrementAndGet(CANCELLED))
        .doOnRequest(l -> incrementAndGet(REQUESTED));
}

代码示例来源:origin: reactor/reactor-core

@Test
public void operatorChainWithLastOperator() {
  Mono<String> m =
      Flux.concat(Mono.just("foo"), Mono.just("bar"))
        .map(a -> a)
        .filter(a -> true)
        .reduce((a, b) -> b)
        .doOnSubscribe(sub -> {});
  Scannable operator = Scannable.from(m);
  assertThat(operator.isScanAvailable()).as("operator.isScanAvailable").isTrue();
  assertThat(operator.steps())
      .containsExactly("source(FluxConcatArray)", "map", "filter", "reduce", "peek");
}

代码示例来源:origin: reactor/reactor-core

@Test
public void operatorChainWithLastSubscriber() {
  AtomicReference<Subscription> subRef = new AtomicReference<>(null);
  Mono<String> m=
      Flux.just("foo")
        .map(a -> a)
        .filter(a -> true)
        .reduce((a, b) -> b)
        .doOnSubscribe(subRef::set);
  m.subscribe();
  Scannable lastSubscriber = Scannable.from(subRef.get());
  assertThat(lastSubscriber.isScanAvailable()).as("lastSubscriber.isScanAvailable").isTrue();
  Stream<String> chain = lastSubscriber.steps();
  assertThat(chain).containsExactly("just", "map", "filter", "reduce", "peek", "lambda");
}

代码示例来源:origin: reactor/reactor-core

@Test
public void totalCancelCanResubscribe() {
  AtomicInteger cancelled = new AtomicInteger();
  AtomicInteger subscribed = new AtomicInteger();
  TestPublisher<Integer> source = TestPublisher.create();
  Mono<Integer> cached = source.mono()
                 .doOnSubscribe(s -> subscribed.incrementAndGet())
                .doOnCancel(cancelled::incrementAndGet)
                .cache(Duration.ofMillis(200));
  Disposable d1 = cached.subscribe();
  Disposable d2 = cached.subscribe();
  d1.dispose();
  d2.dispose();
  assertThat(cancelled.get()).isEqualTo(0);
  assertThat(subscribed.get()).isEqualTo(1);
  StepVerifier.create(cached)
        .then(() -> source.emit(100))
        .expectNext(100)
        .verifyComplete();
  assertThat(cancelled.get()).isEqualTo(0);
  assertThat(subscribed.get()).isEqualTo(1);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void usesFluxDoOnEachConditionalSubscriber() {
  AtomicReference<Scannable> ref = new AtomicReference<>();
  Mono<String> source = Mono.just("foo")
               .doOnSubscribe(sub -> ref.set(Scannable.from(sub)))
               .hide()
               .filter(t -> true);
  final MonoDoOnEach<String> test =
      new MonoDoOnEach<>(source, s -> { });
  test.filter(t -> true)
    .subscribe();
  Class expected = FluxDoOnEach.DoOnEachConditionalSubscriber.class;
  assertThat(ref.get()
         .actuals()
         .map(Object::getClass)
  )
      .contains(expected);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void cancelThenFutureFails() {
  CompletableFuture<Integer> future = new CompletableFuture<>();
  AtomicReference<Subscription> subRef = new AtomicReference<>();
  Mono<Integer> mono = Mono
      .fromFuture(future)
      .doOnSubscribe(subRef::set);
  StepVerifier.create(mono)
        .expectSubscription()
        .then(() -> {
          subRef.get().cancel();
          future.completeExceptionally(new IllegalStateException("boom"));
          future.complete(1);
        })
        .thenCancel()//already cancelled but need to get to verification
        .verifyThenAssertThat()
        .hasDroppedErrorWithMessage("boom");
}

代码示例来源:origin: reactor/reactor-core

@Test
public void monoCreateCancelOnNext() {
  AtomicInteger onCancel = new AtomicInteger();
  AtomicInteger onDispose = new AtomicInteger();
  AtomicReference<Subscription> subscription = new AtomicReference<>();
  Mono<String> created = Mono.create(s -> {
    s.onDispose(onDispose::getAndIncrement)
     .onCancel(onCancel::getAndIncrement)
     .success("done");
  });
  created = created.doOnSubscribe(s -> subscription.set(s))
           .doOnNext(n -> subscription.get().cancel());
  StepVerifier.create(created)
        .expectNext("done")
        .verifyComplete();
  assertThat(onDispose.get()).isEqualTo(1);
  assertThat(onCancel.get()).isEqualTo(0);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void upstreamIsDelayedSource() {
  AtomicReference<Object> upstream = new AtomicReference<>();
  StepVerifier.withVirtualTime(() -> Mono.just(1).delayElement(Duration.ofSeconds
          (2),
      defaultSchedulerForDelay())
      .doOnSubscribe(s -> {
        assertThat(s).isInstanceOf(MonoDelayElement.DelayElementSubscriber.class);
        MonoDelayElement.DelayElementSubscriber delayedSubscriber =
            (MonoDelayElement.DelayElementSubscriber) s;
        upstream.set(delayedSubscriber.scan(Scannable.Attr.PARENT));
      }))
        .expectSubscription()
        .expectNoEvent(Duration.ofSeconds(2))
        .expectNext(1)
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void smokeTestDelay() {
  for (int i = 0; i < 20; i++) {
    Scheduler s = Schedulers.newElastic("test");
    AtomicLong start = new AtomicLong();
    AtomicLong end = new AtomicLong();
    try {
      StepVerifier.create(Mono
          .delay(Duration.ofMillis(100), s)
          .doOnSubscribe(sub -> start.set(System.nanoTime()))
          .doOnTerminate(() -> end.set(System.nanoTime()))
      )
            .expectSubscription()
            .expectNext(0L)
            .verifyComplete();
      long endValue = end.longValue();
      long startValue = start.longValue();
      long measuredDelay = endValue - startValue;
      long measuredDelayMs = TimeUnit.NANOSECONDS.toMillis(measuredDelay);
      assertThat(measuredDelayMs)
          .as("iteration %s, measured delay %s nanos, start at %s nanos, end at %s nanos", i, measuredDelay, startValue, endValue)
          .isGreaterThanOrEqualTo(100L)
          .isLessThan(200L);
    }
    finally {
      s.dispose();
    }
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
public void smokeTestDelay() {
  for (int i = 0; i < 20; i++) {
    Scheduler s = Schedulers.newParallel("test");
    AtomicLong start = new AtomicLong();
    AtomicLong end = new AtomicLong();
    try {
      StepVerifier.create(Mono
          .delay(Duration.ofMillis(100), s)
          .doOnSubscribe(sub -> start.set(System.nanoTime()))
          .doOnTerminate(() -> end.set(System.nanoTime()))
      )
            .expectSubscription()
            .expectNext(0L)
            .verifyComplete();
      long endValue = end.longValue();
      long startValue = start.longValue();
      long measuredDelay = endValue - startValue;
      long measuredDelayMs = TimeUnit.NANOSECONDS.toMillis(measuredDelay);
      assertThat(measuredDelayMs)
          .as("iteration %s, measured delay %s nanos, start at %s nanos, end at %s nanos", i, measuredDelay, startValue, endValue)
          .isGreaterThanOrEqualTo(100L)
          .isLessThan(200L);
    }
    finally {
      s.dispose();
    }
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
public void smokeTestDelay() {
  for (int i = 0; i < 20; i++) {
    Scheduler s = Schedulers.fromExecutorService(Executors.newScheduledThreadPool(1));
    AtomicLong start = new AtomicLong();
    AtomicLong end = new AtomicLong();
    try {
      StepVerifier.create(Mono
          .delay(Duration.ofMillis(100), s)
          .log()
          .doOnSubscribe(sub -> start.set(System.nanoTime()))
          .doOnTerminate(() -> end.set(System.nanoTime()))
      )
            .expectSubscription()
            .expectNext(0L)
            .verifyComplete();
      long endValue = end.longValue();
      long startValue = start.longValue();
      long measuredDelay = endValue - startValue;
      long measuredDelayMs = TimeUnit.NANOSECONDS.toMillis(measuredDelay);
      assertThat(measuredDelayMs)
          .as("iteration %s, measured delay %s nanos, start at %s nanos, end at %s nanos", i, measuredDelay, startValue, endValue)
          .isGreaterThanOrEqualTo(100L)
          .isLessThan(200L);
    }
    finally {
      s.dispose();
    }
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
public void smokeTestDelay() {
  for (int i = 0; i < 20; i++) {
    Scheduler s = Schedulers.newSingle("test");
    AtomicLong start = new AtomicLong();
    AtomicLong end = new AtomicLong();
    try {
      StepVerifier.create(Mono
          .delay(Duration.ofMillis(100), s)
          .log()
          .doOnSubscribe(sub -> start.set(System.nanoTime()))
          .doOnTerminate(() -> end.set(System.nanoTime()))
      )
            .expectSubscription()
            .expectNext(0L)
            .verifyComplete();
      long endValue = end.longValue();
      long startValue = start.longValue();
      long measuredDelay = endValue - startValue;
      long measuredDelayMs = TimeUnit.NANOSECONDS.toMillis(measuredDelay);
      assertThat(measuredDelayMs)
          .as("iteration %s, measured delay %s nanos, start at %s nanos, end at %s nanos", i, measuredDelay, startValue, endValue)
          .isGreaterThanOrEqualTo(100L)
          .isLessThan(200L);
    }
    finally {
      s.dispose();
    }
  }
}

代码示例来源:origin: rsocket/rsocket-java

private void handleFireAndForget(int streamId, Mono<Void> result) {
 result
   .doOnSubscribe(subscription -> sendingSubscriptions.put(streamId, subscription))
   .doFinally(signalType -> sendingSubscriptions.remove(streamId))
   .subscribe(null, errorConsumer);
}

代码示例来源:origin: rsocket/rsocket-java

@Override
public final Mono<Payload> requestResponse(Payload payload) {
 return delegate.requestResponse(payload).doOnSubscribe(s -> rrCount.incrementAndGet());
}

代码示例来源:origin: rsocket/rsocket-java

@Override
public final Mono<Void> fireAndForget(Payload payload) {
 return delegate.fireAndForget(payload).doOnSubscribe(s -> fnfCount.incrementAndGet());
}

相关文章

微信公众号

最新文章

更多

Mono类方法