reactor.core.publisher.Flux.doOnNext()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(7.8k)|赞(0)|评价(0)|浏览(1614)

本文整理了Java中reactor.core.publisher.Flux.doOnNext()方法的一些代码示例,展示了Flux.doOnNext()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.doOnNext()方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:doOnNext

Flux.doOnNext介绍

[英]Add behavior (side-effect) triggered when the Flux emits an item.
[中]添加通量发射项目时触发的行为(副作用)。

代码示例

代码示例来源:origin: spring-projects/spring-framework

@Override
public Flux<DataBuffer> encode(Publisher<? extends DataBuffer> inputStream,
    DataBufferFactory bufferFactory, ResolvableType elementType, @Nullable MimeType mimeType,
    @Nullable Map<String, Object> hints) {
  Flux<DataBuffer> flux = Flux.from(inputStream);
  if (logger.isDebugEnabled() && !Hints.isLoggingSuppressed(hints)) {
    flux = flux.doOnNext(buffer -> {
      String logPrefix = Hints.getLogPrefix(hints);
      logger.debug(logPrefix + "Writing " + buffer.readableByteCount() + " bytes");
    });
  }
  return flux;
}

代码示例来源:origin: spring-projects/spring-framework

@Override
public Flux<WebSocketMessage> receive() {
  return getDelegate().getInbound()
      .aggregateFrames(this.maxFramePayloadLength)
      .receiveFrames()
      .map(super::toMessage)
      .doOnNext(message -> {
        if (logger.isTraceEnabled()) {
          logger.trace(getLogPrefix() + "Received " + message);
        }
      });
}

代码示例来源:origin: spring-projects/spring-framework

@PostMapping("/flux")
public Mono<Void> createWithFlux(@RequestBody Flux<Person> flux) {
  return flux.doOnNext(persons::add).then();
}

代码示例来源:origin: org.springframework/spring-core

@Override
public Flux<DataBuffer> encode(Publisher<? extends DataBuffer> inputStream,
    DataBufferFactory bufferFactory, ResolvableType elementType, @Nullable MimeType mimeType,
    @Nullable Map<String, Object> hints) {
  Flux<DataBuffer> flux = Flux.from(inputStream);
  if (logger.isDebugEnabled() && !Hints.isLoggingSuppressed(hints)) {
    flux = flux.doOnNext(buffer -> {
      String logPrefix = Hints.getLogPrefix(hints);
      logger.debug(logPrefix + "Writing " + buffer.readableByteCount() + " bytes");
    });
  }
  return flux;
}

代码示例来源:origin: spring-projects/spring-framework

@Override
public Mono<Void> send(Publisher<WebSocketMessage> messages) {
  Flux<WebSocketFrame> frames = Flux.from(messages)
      .doOnNext(message -> {
        if (logger.isTraceEnabled()) {
          logger.trace(getLogPrefix() + "Sending " + message);
        }
      })
      .map(this::toFrame);
  return getDelegate().getOutbound()
      .options(NettyPipeline.SendOptions::flushOnEach)
      .sendObject(frames)
      .then();
}

代码示例来源:origin: spring-projects/spring-framework

@PostMapping("/publisher")
public Publisher<Void> createWithPublisher(@RequestBody Publisher<Person> publisher) {
  return Flux.from(publisher).doOnNext(persons::add).then();
}

代码示例来源:origin: spring-projects/spring-framework

@Override
  public Mono<Void> handle(WebSocketSession session) {
    // Use retain() for Reactor Netty
    return session.send(session.receive().doOnNext(WebSocketMessage::retain));
  }
}

代码示例来源:origin: spring-projects/spring-framework

public WiretapRecorder(@Nullable Publisher<? extends DataBuffer> publisher,
    @Nullable Publisher<? extends Publisher<? extends DataBuffer>> publisherNested) {
  if (publisher != null && publisherNested != null) {
    throw new IllegalArgumentException("At most one publisher expected");
  }
  this.publisher = publisher != null ?
      Flux.from(publisher)
          .doOnSubscribe(s -> this.hasContentConsumer = true)
          .doOnNext(this.buffer::write)
          .doOnError(this::handleOnError)
          .doOnCancel(this::handleOnComplete)
          .doOnComplete(this::handleOnComplete) : null;
  this.publisherNested = publisherNested != null ?
      Flux.from(publisherNested)
          .doOnSubscribe(s -> this.hasContentConsumer = true)
          .map(p -> Flux.from(p).doOnNext(this.buffer::write).doOnError(this::handleOnError))
          .doOnError(this::handleOnError)
          .doOnCancel(this::handleOnComplete)
          .doOnComplete(this::handleOnComplete) : null;
  if (publisher == null && publisherNested == null) {
    this.content.onComplete();
  }
}

代码示例来源:origin: reactor/reactor-core

@Override
protected List<Scenario<String, String>> scenarios_touchAndAssertState() {
  return Arrays.asList(scenario(f -> f.doOnNext(d -> {
  })));
}

代码示例来源:origin: spring-projects/spring-framework

@Test
public void sessionClosing() throws Exception {
  this.client.execute(getUrl("/close"),
      session -> {
        logger.debug("Starting..");
        return session.receive()
            .doOnNext(s -> logger.debug("inbound " + s))
            .then()
            .doFinally(signalType -> {
              logger.debug("Completed with: " + signalType);
            });
      })
      .block(TIMEOUT);
}

代码示例来源:origin: reactor/reactor-core

void simpleFlux(){
  Flux.just(1)
    .map(d -> d + 1)
    .doOnNext(d -> {throw new RuntimeException("test");})
    .collectList()
    .onErrorReturn(Collections.singletonList(2))
    .block();
}

代码示例来源:origin: spring-projects/spring-framework

@Override
public Flux<Part> read(ResolvableType elementType, ReactiveHttpInputMessage message, Map<String, Object> hints) {
  return Flux.create(new SynchronossPartGenerator(message, this.bufferFactory, this.streamStorageFactory))
      .doOnNext(part -> {
        if (!Hints.isLoggingSuppressed(hints)) {
          LogFormatUtils.traceDebug(logger, traceOn -> Hints.getLogPrefix(hints) + "Parsed " +
              (isEnableLoggingRequestDetails() ?
                  LogFormatUtils.formatValue(part, !traceOn) :
                  "parts '" + part.name() + "' (content masked)"));
        }
      });
}

代码示例来源:origin: org.springframework/spring-web

@Override
public Flux<Part> read(ResolvableType elementType, ReactiveHttpInputMessage message, Map<String, Object> hints) {
  return Flux.create(new SynchronossPartGenerator(message, this.bufferFactory, this.streamStorageFactory))
      .doOnNext(part -> {
        if (!Hints.isLoggingSuppressed(hints)) {
          LogFormatUtils.traceDebug(logger, traceOn -> Hints.getLogPrefix(hints) + "Parsed " +
              (isEnableLoggingRequestDetails() ?
                  LogFormatUtils.formatValue(part, !traceOn) :
                  "parts '" + part.name() + "' (content masked)"));
        }
      });
}

代码示例来源:origin: reactor/reactor-core

@Test
public void filterAllOut() {
  final int[] calls = { 0 };
  StepVerifier.create(
      Flux.range(1, 1000)
        .doOnNext(v -> calls[0]++)
        .filterWhen(v -> Mono.just(false), 16)
        .flatMap(ignore -> Flux.just(0)))
        .verifyComplete();
  assertThat(calls[0]).isEqualTo(1000);
}

代码示例来源:origin: reactor/reactor-core

@Test
  public void filterAllOutHidden() {
    final int[] calls = { 0 };

    StepVerifier.create(
        Flux.range(1, 1000)
        .doOnNext(v -> calls[0]++)
        .filterWhen(v -> Mono.just(false).hide(), 16)
        .flatMap(ignore -> Flux.just(0)))
          .verifyComplete();

    assertThat(calls[0]).isEqualTo(1000);
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
public void discardOnTimerRejected() {
  Scheduler scheduler = Schedulers.newSingle("discardOnTimerRejected");
  StepVerifier.create(Flux.just(1, 2, 3)
              .doOnNext(n -> scheduler.dispose())
              .bufferTimeout(10, Duration.ofMillis(100), scheduler))
        .expectErrorSatisfies(e -> assertThat(e).isInstanceOf(RejectedExecutionException.class))
        .verifyThenAssertThat()
        .hasDiscardedExactly(1);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void syncFusionAvailable() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Flux.range(1, 2)
    .doOnNext(v -> {
    })
    .subscribe(ts);
  Subscription s = ts.upstream();
  Assert.assertTrue("Non-fuseable upstream: " + s,
      s instanceof Fuseable.QueueSubscription);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void syncFusionAvailable() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Flux.range(1, 2)
    .doOnNext(v -> {
    })
    .subscribe(ts);
  Subscription s = ts.upstream();
  Assert.assertTrue("Non-fuseable upstream: " + s,
      s instanceof Fuseable.QueueSubscription);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void callableReturnsNullFused2() {
  StepVerifier.create(Mono.fromCallable(() -> null)
              .flux()
              .subscribeOn(Schedulers.single())
      .doOnNext(v -> System.out.println(v)), 1)
        .expectFusion(Fuseable.ASYNC)
        .thenRequest(1)
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void publishOnFilter() throws Exception {
  Flux<Long> flux = Flux.interval(Duration.ofMillis(2)).take(255)
             .publishOn(scheduler)
             .filter(t -> true)
             .doOnNext(i -> onNext(i))
             .doOnError(e -> onError(e));
  verifyRejectedExecutionConsistency(flux, 5);
}

相关文章

微信公众号

最新文章

更多

Flux类方法