本文整理了Java中reactor.core.publisher.Flux.doOnNext()
方法的一些代码示例,展示了Flux.doOnNext()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.doOnNext()
方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:doOnNext
[英]Add behavior (side-effect) triggered when the Flux emits an item.
[中]添加通量发射项目时触发的行为(副作用)。
代码示例来源:origin: spring-projects/spring-framework
@Override
public Flux<DataBuffer> encode(Publisher<? extends DataBuffer> inputStream,
DataBufferFactory bufferFactory, ResolvableType elementType, @Nullable MimeType mimeType,
@Nullable Map<String, Object> hints) {
Flux<DataBuffer> flux = Flux.from(inputStream);
if (logger.isDebugEnabled() && !Hints.isLoggingSuppressed(hints)) {
flux = flux.doOnNext(buffer -> {
String logPrefix = Hints.getLogPrefix(hints);
logger.debug(logPrefix + "Writing " + buffer.readableByteCount() + " bytes");
});
}
return flux;
}
代码示例来源:origin: spring-projects/spring-framework
@Override
public Flux<WebSocketMessage> receive() {
return getDelegate().getInbound()
.aggregateFrames(this.maxFramePayloadLength)
.receiveFrames()
.map(super::toMessage)
.doOnNext(message -> {
if (logger.isTraceEnabled()) {
logger.trace(getLogPrefix() + "Received " + message);
}
});
}
代码示例来源:origin: spring-projects/spring-framework
@PostMapping("/flux")
public Mono<Void> createWithFlux(@RequestBody Flux<Person> flux) {
return flux.doOnNext(persons::add).then();
}
代码示例来源:origin: org.springframework/spring-core
@Override
public Flux<DataBuffer> encode(Publisher<? extends DataBuffer> inputStream,
DataBufferFactory bufferFactory, ResolvableType elementType, @Nullable MimeType mimeType,
@Nullable Map<String, Object> hints) {
Flux<DataBuffer> flux = Flux.from(inputStream);
if (logger.isDebugEnabled() && !Hints.isLoggingSuppressed(hints)) {
flux = flux.doOnNext(buffer -> {
String logPrefix = Hints.getLogPrefix(hints);
logger.debug(logPrefix + "Writing " + buffer.readableByteCount() + " bytes");
});
}
return flux;
}
代码示例来源:origin: spring-projects/spring-framework
@Override
public Mono<Void> send(Publisher<WebSocketMessage> messages) {
Flux<WebSocketFrame> frames = Flux.from(messages)
.doOnNext(message -> {
if (logger.isTraceEnabled()) {
logger.trace(getLogPrefix() + "Sending " + message);
}
})
.map(this::toFrame);
return getDelegate().getOutbound()
.options(NettyPipeline.SendOptions::flushOnEach)
.sendObject(frames)
.then();
}
代码示例来源:origin: spring-projects/spring-framework
@PostMapping("/publisher")
public Publisher<Void> createWithPublisher(@RequestBody Publisher<Person> publisher) {
return Flux.from(publisher).doOnNext(persons::add).then();
}
代码示例来源:origin: spring-projects/spring-framework
@Override
public Mono<Void> handle(WebSocketSession session) {
// Use retain() for Reactor Netty
return session.send(session.receive().doOnNext(WebSocketMessage::retain));
}
}
代码示例来源:origin: spring-projects/spring-framework
public WiretapRecorder(@Nullable Publisher<? extends DataBuffer> publisher,
@Nullable Publisher<? extends Publisher<? extends DataBuffer>> publisherNested) {
if (publisher != null && publisherNested != null) {
throw new IllegalArgumentException("At most one publisher expected");
}
this.publisher = publisher != null ?
Flux.from(publisher)
.doOnSubscribe(s -> this.hasContentConsumer = true)
.doOnNext(this.buffer::write)
.doOnError(this::handleOnError)
.doOnCancel(this::handleOnComplete)
.doOnComplete(this::handleOnComplete) : null;
this.publisherNested = publisherNested != null ?
Flux.from(publisherNested)
.doOnSubscribe(s -> this.hasContentConsumer = true)
.map(p -> Flux.from(p).doOnNext(this.buffer::write).doOnError(this::handleOnError))
.doOnError(this::handleOnError)
.doOnCancel(this::handleOnComplete)
.doOnComplete(this::handleOnComplete) : null;
if (publisher == null && publisherNested == null) {
this.content.onComplete();
}
}
代码示例来源:origin: reactor/reactor-core
@Override
protected List<Scenario<String, String>> scenarios_touchAndAssertState() {
return Arrays.asList(scenario(f -> f.doOnNext(d -> {
})));
}
代码示例来源:origin: spring-projects/spring-framework
@Test
public void sessionClosing() throws Exception {
this.client.execute(getUrl("/close"),
session -> {
logger.debug("Starting..");
return session.receive()
.doOnNext(s -> logger.debug("inbound " + s))
.then()
.doFinally(signalType -> {
logger.debug("Completed with: " + signalType);
});
})
.block(TIMEOUT);
}
代码示例来源:origin: reactor/reactor-core
void simpleFlux(){
Flux.just(1)
.map(d -> d + 1)
.doOnNext(d -> {throw new RuntimeException("test");})
.collectList()
.onErrorReturn(Collections.singletonList(2))
.block();
}
代码示例来源:origin: spring-projects/spring-framework
@Override
public Flux<Part> read(ResolvableType elementType, ReactiveHttpInputMessage message, Map<String, Object> hints) {
return Flux.create(new SynchronossPartGenerator(message, this.bufferFactory, this.streamStorageFactory))
.doOnNext(part -> {
if (!Hints.isLoggingSuppressed(hints)) {
LogFormatUtils.traceDebug(logger, traceOn -> Hints.getLogPrefix(hints) + "Parsed " +
(isEnableLoggingRequestDetails() ?
LogFormatUtils.formatValue(part, !traceOn) :
"parts '" + part.name() + "' (content masked)"));
}
});
}
代码示例来源:origin: org.springframework/spring-web
@Override
public Flux<Part> read(ResolvableType elementType, ReactiveHttpInputMessage message, Map<String, Object> hints) {
return Flux.create(new SynchronossPartGenerator(message, this.bufferFactory, this.streamStorageFactory))
.doOnNext(part -> {
if (!Hints.isLoggingSuppressed(hints)) {
LogFormatUtils.traceDebug(logger, traceOn -> Hints.getLogPrefix(hints) + "Parsed " +
(isEnableLoggingRequestDetails() ?
LogFormatUtils.formatValue(part, !traceOn) :
"parts '" + part.name() + "' (content masked)"));
}
});
}
代码示例来源:origin: reactor/reactor-core
@Test
public void filterAllOut() {
final int[] calls = { 0 };
StepVerifier.create(
Flux.range(1, 1000)
.doOnNext(v -> calls[0]++)
.filterWhen(v -> Mono.just(false), 16)
.flatMap(ignore -> Flux.just(0)))
.verifyComplete();
assertThat(calls[0]).isEqualTo(1000);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void filterAllOutHidden() {
final int[] calls = { 0 };
StepVerifier.create(
Flux.range(1, 1000)
.doOnNext(v -> calls[0]++)
.filterWhen(v -> Mono.just(false).hide(), 16)
.flatMap(ignore -> Flux.just(0)))
.verifyComplete();
assertThat(calls[0]).isEqualTo(1000);
}
}
代码示例来源:origin: reactor/reactor-core
@Test
public void discardOnTimerRejected() {
Scheduler scheduler = Schedulers.newSingle("discardOnTimerRejected");
StepVerifier.create(Flux.just(1, 2, 3)
.doOnNext(n -> scheduler.dispose())
.bufferTimeout(10, Duration.ofMillis(100), scheduler))
.expectErrorSatisfies(e -> assertThat(e).isInstanceOf(RejectedExecutionException.class))
.verifyThenAssertThat()
.hasDiscardedExactly(1);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void syncFusionAvailable() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Flux.range(1, 2)
.doOnNext(v -> {
})
.subscribe(ts);
Subscription s = ts.upstream();
Assert.assertTrue("Non-fuseable upstream: " + s,
s instanceof Fuseable.QueueSubscription);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void syncFusionAvailable() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Flux.range(1, 2)
.doOnNext(v -> {
})
.subscribe(ts);
Subscription s = ts.upstream();
Assert.assertTrue("Non-fuseable upstream: " + s,
s instanceof Fuseable.QueueSubscription);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void callableReturnsNullFused2() {
StepVerifier.create(Mono.fromCallable(() -> null)
.flux()
.subscribeOn(Schedulers.single())
.doOnNext(v -> System.out.println(v)), 1)
.expectFusion(Fuseable.ASYNC)
.thenRequest(1)
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void publishOnFilter() throws Exception {
Flux<Long> flux = Flux.interval(Duration.ofMillis(2)).take(255)
.publishOn(scheduler)
.filter(t -> true)
.doOnNext(i -> onNext(i))
.doOnError(e -> onError(e));
verifyRejectedExecutionConsistency(flux, 5);
}
内容来源于网络,如有侵权,请联系作者删除!