reactor.core.publisher.Flux.doOnCancel()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(7.9k)|赞(0)|评价(0)|浏览(603)

本文整理了Java中reactor.core.publisher.Flux.doOnCancel()方法的一些代码示例,展示了Flux.doOnCancel()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.doOnCancel()方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:doOnCancel

Flux.doOnCancel介绍

[英]Add behavior (side-effect) triggered when the Flux is cancelled.
[中]添加流量取消时触发的行为(副作用)。

代码示例

代码示例来源:origin: spring-projects/spring-framework

@Override
public Flux<DataBuffer> getBody() {
  return this.inbound.receive()
      .doOnSubscribe(s -> {
        if (this.rejectSubscribers.get()) {
          throw new IllegalStateException("The client response body can only be consumed once.");
        }
      })
      .doOnCancel(() -> {
        // https://github.com/reactor/reactor-netty/issues/503
        // FluxReceive rejects multiple subscribers, but not after a cancel().
        // Subsequent subscribers after cancel() will not be rejected, but will hang instead.
        // So we need to intercept and reject them in that case.
        this.rejectSubscribers.set(true);
      })
      .map(byteBuf -> {
        byteBuf.retain();
        return this.bufferFactory.wrap(byteBuf);
      });
}

代码示例来源:origin: spring-projects/spring-framework

public WiretapRecorder(@Nullable Publisher<? extends DataBuffer> publisher,
    @Nullable Publisher<? extends Publisher<? extends DataBuffer>> publisherNested) {
  if (publisher != null && publisherNested != null) {
    throw new IllegalArgumentException("At most one publisher expected");
  }
  this.publisher = publisher != null ?
      Flux.from(publisher)
          .doOnSubscribe(s -> this.hasContentConsumer = true)
          .doOnNext(this.buffer::write)
          .doOnError(this::handleOnError)
          .doOnCancel(this::handleOnComplete)
          .doOnComplete(this::handleOnComplete) : null;
  this.publisherNested = publisherNested != null ?
      Flux.from(publisherNested)
          .doOnSubscribe(s -> this.hasContentConsumer = true)
          .map(p -> Flux.from(p).doOnNext(this.buffer::write).doOnError(this::handleOnError))
          .doOnError(this::handleOnError)
          .doOnCancel(this::handleOnComplete)
          .doOnComplete(this::handleOnComplete) : null;
  if (publisher == null && publisherNested == null) {
    this.content.onComplete();
  }
}

代码示例来源:origin: spring-projects/spring-framework

@GetMapping("/infinite")
  Flux<String> infinite() {
    return Flux.just(0, 1).map(l -> "foo " + l)
        .mergeWith(Flux.never())
        .doOnCancel(() -> cancellation.onComplete());
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
public void fluxCancelledByMonoProcessor() {
  AtomicLong cancelCounter = new AtomicLong();
  Flux.range(1, 10)
    .doOnCancel(cancelCounter::incrementAndGet)
    .publishNext()
    .subscribe();
  assertThat(cancelCounter.get()).isEqualTo(1);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void hasElementsCancel() throws InterruptedException {
  AtomicBoolean cancelled = new AtomicBoolean();
  Flux.just("foo", "bar").hide()
    .doOnCancel(() -> cancelled.set(true))
    .hasElements()
    .subscribe(v -> {}, e -> {}, () -> {},
        Subscription::cancel);
  assertThat(cancelled.get()).isTrue();
}

代码示例来源:origin: reactor/reactor-core

private void subscribe(Flux<Flux<Integer>> windows) {
  mainSubscriber = AssertSubscriber.create();
  windows.doOnCancel(() -> mainCancelled.incrementAndGet())
      .doOnComplete(() -> mainCompleted.incrementAndGet())
      .doOnTerminate(() -> mainTerminated.incrementAndGet()).subscribe(mainSubscriber);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void fluxBlockFirstCancelsOnce() {
  AtomicLong cancelCount = new AtomicLong();
  Flux.range(1, 10)
    .doOnCancel(cancelCount::incrementAndGet)
    .blockFirst();
  assertThat(cancelCount.get()).isEqualTo(1);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void fluxBlockLastDoesntCancel() {
  AtomicLong cancelCount = new AtomicLong();
  Flux.range(1, 10)
    .doOnCancel(cancelCount::incrementAndGet)
    .blockLast();
  assertThat(cancelCount.get()).isEqualTo(0);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void differenceCancelsBothSources() {
  AtomicBoolean sub1 = new AtomicBoolean();
  AtomicBoolean sub2 = new AtomicBoolean();
  Flux<Integer> source1 = Flux.range(1, 5).doOnCancel(() -> sub1.set(true));
  Flux<Integer> source2 = Flux.just(1, 2, 3, 7, 8).doOnCancel(() -> sub2.set(true));
  StepVerifier.create(Mono.sequenceEqual(source1, source2))
        .expectNext(Boolean.FALSE)
        .verifyComplete();
  Assert.assertTrue("left not cancelled", sub1.get());
  Assert.assertTrue("right not cancelled", sub2.get());
}

代码示例来源:origin: reactor/reactor-core

@Test
public void innerFluxCancelled() {
  AtomicInteger cancelCount = new AtomicInteger();
  StepVerifier.create(Mono.just(1)
              .filterWhen(v -> Flux.just(true, false, false)
                         .doOnCancel(cancelCount::incrementAndGet)))
        .expectNext(1)
        .verifyComplete();
  assertThat(cancelCount.get()).isEqualTo(1);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void cancelsOther() {
  AtomicBoolean cancelled = new AtomicBoolean();
  Flux<Integer> when = Flux.range(1, 10)
               .doOnCancel(() -> cancelled.set(true));
  StepVerifier.create(justError.retryWhen(other -> when))
        .thenCancel()
        .verify();
  assertThat(cancelled.get()).isTrue();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void triggerSequenceHasMultipleValuesNotCancelled() {
  AtomicBoolean triggerCancelled = new AtomicBoolean();
  StepVerifier.create(Mono.just("foo")
              .delayUntil(
                  a -> Flux.just(1, 2, 3).hide()
                       .doOnCancel(() -> triggerCancelled.set(true))))
        .expectNext("foo")
        .verifyComplete();
  assertThat(triggerCancelled.get()).isFalse();
}

代码示例来源:origin: reactor/reactor-core

@Override
public Flux<T> flux() {
  return Flux.from(delegate)
        .doOnSubscribe(sub -> incrementAndGet(SUBSCRIBED))
        .doOnCancel(() -> incrementAndGet(CANCELLED))
        .doOnRequest(l -> incrementAndGet(REQUESTED));
}

代码示例来源:origin: reactor/reactor-core

@Test
public void fluxSourceIsCancelled() {
  AtomicLong cancelCount = new AtomicLong();
  StepVerifier.create(Flux.range(1, 10)
              .doOnCancel(cancelCount::incrementAndGet)
              .hasElements())
        .expectNext(true)
        .verifyComplete();
  assertThat(cancelCount.get()).isEqualTo(1);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void largerSourceCancelled() {
  AtomicBoolean cancelled = new AtomicBoolean();
  Flux<Integer> test = Flux.range(1, 1000)
      .doOnCancel(() -> cancelled.set(true))
      .limitRequest(3);
  StepVerifier.create(test)
        .expectNextCount(3)
        .verifyComplete();
  assertThat(cancelled.get()).as("source is cancelled").isTrue();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void cancelsOther() {
  AtomicBoolean cancelled = new AtomicBoolean();
  Flux<Integer> when = Flux.range(1, 10)
               .doOnCancel(() -> cancelled.set(true));
  StepVerifier.create(Flux.just(1).repeatWhen(other -> when))
        .thenCancel()
        .verify();
  assertThat(cancelled.get()).isTrue();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void triggerSequenceHasMultipleValuesNotCancelled() {
  AtomicBoolean triggerCancelled = new AtomicBoolean();
  StepVerifier.create(Flux.just("foo")
              .delayUntil(
                  a -> Flux.just(1, 2, 3).hide()
                       .doOnCancel(() -> triggerCancelled.set(true))))
        .expectNext("foo")
        .verifyComplete();
  assertThat(triggerCancelled.get()).isFalse();
}

代码示例来源:origin: reactor/reactor-core

private void expectWindow(int index, Predicate<? super Integer> innerCancelPredicate, List<Integer> values) {
  AssertSubscriber<Integer> s = AssertSubscriber.create();
  mainSubscriber.values().get(index)
      .doOnCancel(() -> innerCancelled.incrementAndGet())
      .doOnComplete(() -> {
        innerCompleted.incrementAndGet();})
      .doOnTerminate(() -> innerTerminated.incrementAndGet())
      .takeWhile(innerCancelPredicate).subscribe(s);
  s.assertValueSequence(values).assertNoError();
  innerCreated.incrementAndGet();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void innerFluxCancelled() {
  AtomicInteger cancelCount = new AtomicInteger();
  StepVerifier.create(Flux.range(1, 3)
              .filterWhen(v -> Flux.just(true, false, false)
                         .doOnCancel(cancelCount::incrementAndGet)))
        .expectNext(1, 2, 3)
        .verifyComplete();
  assertThat(cancelCount.get()).isEqualTo(3);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void noCancelOnUnexpectedCompleteSignal() {
  LongAdder cancelled = new LongAdder();
  assertThatExceptionOfType(AssertionError.class)
      .isThrownBy(() -> StepVerifier.create(Flux.empty()
                .doOnCancel(cancelled::increment))
          .expectError()
          .verify())
      .withMessageContaining("expected: onError(); actual: onComplete()");
  assertThat(cancelled.intValue())
      .overridingErrorMessage("the expectError assertion caused a cancellation")
      .isZero();
}

相关文章

微信公众号

最新文章

更多

Flux类方法