本文整理了Java中reactor.core.publisher.Flux.doOnCancel()
方法的一些代码示例,展示了Flux.doOnCancel()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.doOnCancel()
方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:doOnCancel
[英]Add behavior (side-effect) triggered when the Flux is cancelled.
[中]添加流量取消时触发的行为(副作用)。
代码示例来源:origin: spring-projects/spring-framework
@Override
public Flux<DataBuffer> getBody() {
return this.inbound.receive()
.doOnSubscribe(s -> {
if (this.rejectSubscribers.get()) {
throw new IllegalStateException("The client response body can only be consumed once.");
}
})
.doOnCancel(() -> {
// https://github.com/reactor/reactor-netty/issues/503
// FluxReceive rejects multiple subscribers, but not after a cancel().
// Subsequent subscribers after cancel() will not be rejected, but will hang instead.
// So we need to intercept and reject them in that case.
this.rejectSubscribers.set(true);
})
.map(byteBuf -> {
byteBuf.retain();
return this.bufferFactory.wrap(byteBuf);
});
}
代码示例来源:origin: spring-projects/spring-framework
public WiretapRecorder(@Nullable Publisher<? extends DataBuffer> publisher,
@Nullable Publisher<? extends Publisher<? extends DataBuffer>> publisherNested) {
if (publisher != null && publisherNested != null) {
throw new IllegalArgumentException("At most one publisher expected");
}
this.publisher = publisher != null ?
Flux.from(publisher)
.doOnSubscribe(s -> this.hasContentConsumer = true)
.doOnNext(this.buffer::write)
.doOnError(this::handleOnError)
.doOnCancel(this::handleOnComplete)
.doOnComplete(this::handleOnComplete) : null;
this.publisherNested = publisherNested != null ?
Flux.from(publisherNested)
.doOnSubscribe(s -> this.hasContentConsumer = true)
.map(p -> Flux.from(p).doOnNext(this.buffer::write).doOnError(this::handleOnError))
.doOnError(this::handleOnError)
.doOnCancel(this::handleOnComplete)
.doOnComplete(this::handleOnComplete) : null;
if (publisher == null && publisherNested == null) {
this.content.onComplete();
}
}
代码示例来源:origin: spring-projects/spring-framework
@GetMapping("/infinite")
Flux<String> infinite() {
return Flux.just(0, 1).map(l -> "foo " + l)
.mergeWith(Flux.never())
.doOnCancel(() -> cancellation.onComplete());
}
}
代码示例来源:origin: reactor/reactor-core
@Test
public void fluxCancelledByMonoProcessor() {
AtomicLong cancelCounter = new AtomicLong();
Flux.range(1, 10)
.doOnCancel(cancelCounter::incrementAndGet)
.publishNext()
.subscribe();
assertThat(cancelCounter.get()).isEqualTo(1);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void hasElementsCancel() throws InterruptedException {
AtomicBoolean cancelled = new AtomicBoolean();
Flux.just("foo", "bar").hide()
.doOnCancel(() -> cancelled.set(true))
.hasElements()
.subscribe(v -> {}, e -> {}, () -> {},
Subscription::cancel);
assertThat(cancelled.get()).isTrue();
}
代码示例来源:origin: reactor/reactor-core
private void subscribe(Flux<Flux<Integer>> windows) {
mainSubscriber = AssertSubscriber.create();
windows.doOnCancel(() -> mainCancelled.incrementAndGet())
.doOnComplete(() -> mainCompleted.incrementAndGet())
.doOnTerminate(() -> mainTerminated.incrementAndGet()).subscribe(mainSubscriber);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void fluxBlockFirstCancelsOnce() {
AtomicLong cancelCount = new AtomicLong();
Flux.range(1, 10)
.doOnCancel(cancelCount::incrementAndGet)
.blockFirst();
assertThat(cancelCount.get()).isEqualTo(1);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void fluxBlockLastDoesntCancel() {
AtomicLong cancelCount = new AtomicLong();
Flux.range(1, 10)
.doOnCancel(cancelCount::incrementAndGet)
.blockLast();
assertThat(cancelCount.get()).isEqualTo(0);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void differenceCancelsBothSources() {
AtomicBoolean sub1 = new AtomicBoolean();
AtomicBoolean sub2 = new AtomicBoolean();
Flux<Integer> source1 = Flux.range(1, 5).doOnCancel(() -> sub1.set(true));
Flux<Integer> source2 = Flux.just(1, 2, 3, 7, 8).doOnCancel(() -> sub2.set(true));
StepVerifier.create(Mono.sequenceEqual(source1, source2))
.expectNext(Boolean.FALSE)
.verifyComplete();
Assert.assertTrue("left not cancelled", sub1.get());
Assert.assertTrue("right not cancelled", sub2.get());
}
代码示例来源:origin: reactor/reactor-core
@Test
public void innerFluxCancelled() {
AtomicInteger cancelCount = new AtomicInteger();
StepVerifier.create(Mono.just(1)
.filterWhen(v -> Flux.just(true, false, false)
.doOnCancel(cancelCount::incrementAndGet)))
.expectNext(1)
.verifyComplete();
assertThat(cancelCount.get()).isEqualTo(1);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void cancelsOther() {
AtomicBoolean cancelled = new AtomicBoolean();
Flux<Integer> when = Flux.range(1, 10)
.doOnCancel(() -> cancelled.set(true));
StepVerifier.create(justError.retryWhen(other -> when))
.thenCancel()
.verify();
assertThat(cancelled.get()).isTrue();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void triggerSequenceHasMultipleValuesNotCancelled() {
AtomicBoolean triggerCancelled = new AtomicBoolean();
StepVerifier.create(Mono.just("foo")
.delayUntil(
a -> Flux.just(1, 2, 3).hide()
.doOnCancel(() -> triggerCancelled.set(true))))
.expectNext("foo")
.verifyComplete();
assertThat(triggerCancelled.get()).isFalse();
}
代码示例来源:origin: reactor/reactor-core
@Override
public Flux<T> flux() {
return Flux.from(delegate)
.doOnSubscribe(sub -> incrementAndGet(SUBSCRIBED))
.doOnCancel(() -> incrementAndGet(CANCELLED))
.doOnRequest(l -> incrementAndGet(REQUESTED));
}
代码示例来源:origin: reactor/reactor-core
@Test
public void fluxSourceIsCancelled() {
AtomicLong cancelCount = new AtomicLong();
StepVerifier.create(Flux.range(1, 10)
.doOnCancel(cancelCount::incrementAndGet)
.hasElements())
.expectNext(true)
.verifyComplete();
assertThat(cancelCount.get()).isEqualTo(1);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void largerSourceCancelled() {
AtomicBoolean cancelled = new AtomicBoolean();
Flux<Integer> test = Flux.range(1, 1000)
.doOnCancel(() -> cancelled.set(true))
.limitRequest(3);
StepVerifier.create(test)
.expectNextCount(3)
.verifyComplete();
assertThat(cancelled.get()).as("source is cancelled").isTrue();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void cancelsOther() {
AtomicBoolean cancelled = new AtomicBoolean();
Flux<Integer> when = Flux.range(1, 10)
.doOnCancel(() -> cancelled.set(true));
StepVerifier.create(Flux.just(1).repeatWhen(other -> when))
.thenCancel()
.verify();
assertThat(cancelled.get()).isTrue();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void triggerSequenceHasMultipleValuesNotCancelled() {
AtomicBoolean triggerCancelled = new AtomicBoolean();
StepVerifier.create(Flux.just("foo")
.delayUntil(
a -> Flux.just(1, 2, 3).hide()
.doOnCancel(() -> triggerCancelled.set(true))))
.expectNext("foo")
.verifyComplete();
assertThat(triggerCancelled.get()).isFalse();
}
代码示例来源:origin: reactor/reactor-core
private void expectWindow(int index, Predicate<? super Integer> innerCancelPredicate, List<Integer> values) {
AssertSubscriber<Integer> s = AssertSubscriber.create();
mainSubscriber.values().get(index)
.doOnCancel(() -> innerCancelled.incrementAndGet())
.doOnComplete(() -> {
innerCompleted.incrementAndGet();})
.doOnTerminate(() -> innerTerminated.incrementAndGet())
.takeWhile(innerCancelPredicate).subscribe(s);
s.assertValueSequence(values).assertNoError();
innerCreated.incrementAndGet();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void innerFluxCancelled() {
AtomicInteger cancelCount = new AtomicInteger();
StepVerifier.create(Flux.range(1, 3)
.filterWhen(v -> Flux.just(true, false, false)
.doOnCancel(cancelCount::incrementAndGet)))
.expectNext(1, 2, 3)
.verifyComplete();
assertThat(cancelCount.get()).isEqualTo(3);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void noCancelOnUnexpectedCompleteSignal() {
LongAdder cancelled = new LongAdder();
assertThatExceptionOfType(AssertionError.class)
.isThrownBy(() -> StepVerifier.create(Flux.empty()
.doOnCancel(cancelled::increment))
.expectError()
.verify())
.withMessageContaining("expected: onError(); actual: onComplete()");
assertThat(cancelled.intValue())
.overridingErrorMessage("the expectError assertion caused a cancellation")
.isZero();
}
内容来源于网络,如有侵权,请联系作者删除!