reactor.core.publisher.Flux.take()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(10.0k)|赞(0)|评价(0)|浏览(534)

本文整理了Java中reactor.core.publisher.Flux.take()方法的一些代码示例,展示了Flux.take()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.take()方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:take

Flux.take介绍

[英]Take only the first N values from this Flux, if available.

If N is zero, the resulting Flux completes as soon as this Fluxsignals its first value (which is not not relayed, though).

Note that this operator doesn't manipulate the backpressure requested amount. Rather, it merely lets requests from downstream propagate as is and cancels once N elements have been emitted. As a result, the source could produce a lot of extraneous elements in the meantime. If that behavior is undesirable and you do not own the request from downstream (e.g. prefetching operators), consider using #limitRequest(long) instead.
[中]如果可用,只取该通量的前N个值。
如果N为零,则当该通量发出其第一个值(但未中继)时,产生的通量即完成。
请注意,此操作员不会操纵背压请求量。相反,它只允许来自下游的请求按原样传播,并在发出N个元素后取消。因此,源可以同时产生大量外来元素。如果这种行为是不受欢迎的,并且您不拥有来自下游的请求(例如预取操作符),那么可以考虑使用{LimeItRebug(long)代替。

代码示例

代码示例来源:origin: spring-projects/spring-framework

@Override
public final Flux<DataBuffer> encode(Publisher<? extends T> inputStream, DataBufferFactory bufferFactory,
    ResolvableType elementType, @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
  return Flux.from(inputStream).
      take(1).
      concatMap(t -> encode(t, bufferFactory, elementType, mimeType, hints));
}

代码示例来源:origin: spring-projects/spring-framework

@GetMapping(path = "/spr16869", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
  Flux<String> sseFlux() {
    return Flux.interval(Duration.ofSeconds(1)).take(3)
        .map(aLong -> String.format("event%d", aLong));
  }
}

代码示例来源:origin: org.springframework/spring-core

@Override
public final Flux<DataBuffer> encode(Publisher<? extends T> inputStream, DataBufferFactory bufferFactory,
    ResolvableType elementType, @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
  return Flux.from(inputStream).
      take(1).
      concatMap(t -> encode(t, bufferFactory, elementType, mimeType, hints));
}

代码示例来源:origin: spring-projects/spring-framework

@GetMapping(produces = "text/event-stream")
Flux<Person> getPersonStream() {
  return Flux.interval(ofMillis(100)).take(50).onBackpressureBuffer(50)
      .map(index -> new Person("N" + index));
}

代码示例来源:origin: spring-projects/spring-framework

/**
 * Return an interval stream of N number of ticks and buffer the emissions
 * to avoid back pressure failures (e.g. on slow CI server).
 *
 * <p>Use this method as follows:
 * <ul>
 * <li>Tests that verify N number of items followed by verifyOnComplete()
 * should set the number of emissions to N.
 * <li>Tests that verify N number of items followed by thenCancel() should
 * set the number of buffered to an arbitrary number greater than N.
 * </ul>
 */
public static Flux<Long> testInterval(Duration period, int count) {
  return Flux.interval(period).take(count).onBackpressureBuffer(count);
}

代码示例来源:origin: spring-projects/spring-framework

@GetMapping("/event")
Flux<ServerSentEvent<Person>> sse() {
  return INTERVAL.take(2).map(l ->
      ServerSentEvent.builder(new Person("foo " + l))
          .id(Long.toString(l))
          .comment("bar " + l)
          .build());
}

代码示例来源:origin: spring-projects/spring-framework

@Test
public void echo() throws Exception {
  int count = 100;
  Flux<String> input = Flux.range(1, count).map(index -> "msg-" + index);
  ReplayProcessor<Object> output = ReplayProcessor.create(count);
  this.client.execute(getUrl("/echo"), session -> session
      .send(input.map(session::textMessage))
      .thenMany(session.receive().take(count).map(WebSocketMessage::getPayloadAsText))
      .subscribeWith(output)
      .then())
      .block(TIMEOUT);
  assertEquals(input.collectList().block(TIMEOUT), output.collectList().block(TIMEOUT));
}

代码示例来源:origin: spring-projects/spring-framework

@Override
protected void testDecodeError(Publisher<DataBuffer> input, ResolvableType outputType,
    @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
  input = Flux.concat(
      Flux.from(input).take(1),
      Flux.error(new InputException()));
  Flux<String> result = this.decoder.decode(input, outputType, mimeType, hints);
  StepVerifier.create(result)
      .expectError(InputException.class)
      .verify();
}

代码示例来源:origin: spring-projects/spring-framework

parts = (adapter.isMultiValue() ? parts : parts.take(1));
return Mono.just(adapter.fromPublisher(parts));

代码示例来源:origin: spring-projects/spring-framework

@Override
protected void testDecodeError(Publisher<DataBuffer> input, ResolvableType outputType,
    @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
  input = Flux.concat(
      Flux.from(input).take(1),
      Flux.error(new InputException()));
  Flux<Resource> result = this.decoder.decode(input, outputType, mimeType, hints);
  StepVerifier.create(result)
      .expectError(InputException.class)
      .verify();
}

代码示例来源:origin: spring-projects/spring-framework

@Override
public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
  String path = request.getURI().getPath();
  switch (path) {
    case "/write-and-flush":
      return response.writeAndFlushWith(
          testInterval(Duration.ofMillis(50), 2)
              .map(longValue -> wrap("data" + longValue + "\n", response))
              .map(Flux::just)
              .mergeWith(Flux.never()));
    case "/write-and-complete":
      return response.writeWith(
          chunks1K().take(64).map(s -> wrap(s, response)));
    case "/write-and-never-complete":
      // Reactor requires at least 50 to flush, Tomcat/Undertow 8, Jetty 1
      return response.writeWith(
          chunks1K().take(64).map(s -> wrap(s, response)).mergeWith(Flux.never()));
    default:
      return response.writeWith(Flux.empty());
  }
}

代码示例来源:origin: spring-projects/spring-framework

/**
 * Test a {@link Decoder#decodeToMono decode} scenario where the input stream contains an error.
 * This test method will feed the first element of the {@code input} stream to the decoder,
 * followed by an {@link InputException}.
 * The result is expected to contain the error.
 *
 * @param input the input to be provided to the decoder
 * @param outputType the desired output type
 * @param mimeType the mime type to use for decoding. May be {@code null}.
 * @param hints the hints used for decoding. May be {@code null}.
 * @see InputException
 */
protected void testDecodeToMonoError(Publisher<DataBuffer> input, ResolvableType outputType,
    @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
  input = Flux.concat(
      Flux.from(input).take(1),
      Flux.error(new InputException()));
  Mono<?> result = this.decoder.decodeToMono(input, outputType, mimeType, hints);
  StepVerifier.create(result)
      .expectError(InputException.class)
      .verify();
}

代码示例来源:origin: spring-projects/spring-framework

/**
 * Test a {@link Decoder#decode decode} scenario where the input stream contains an error.
 * This test method will feed the first element of the {@code input} stream to the decoder,
 * followed by an {@link InputException}.
 * The result is expected to contain one "normal" element, followed by the error.
 *
 * @param input the input to be provided to the decoder
 * @param outputType the desired output type
 * @param mimeType the mime type to use for decoding. May be {@code null}.
 * @param hints the hints used for decoding. May be {@code null}.
 * @see InputException
 */
protected void testDecodeError(Publisher<DataBuffer> input, ResolvableType outputType,
    @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
  input = Flux.concat(
      Flux.from(input).take(1),
      Flux.error(new InputException()));
  Flux<?> result = this.decoder.decode(input, outputType, mimeType, hints);
  StepVerifier.create(result)
      .expectNextCount(1)
      .expectError(InputException.class)
      .verify();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void contextTest() {
  StepVerifier.create(Flux.create(s -> IntStream.range(0, 10).forEach(i -> s.next(s
      .currentContext()
                              .get(AtomicInteger.class)
                              .incrementAndGet())))
              .take(10)
              .subscriberContext(ctx -> ctx.put(AtomicInteger.class,
                  new AtomicInteger())))
        .expectNext(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
        .verifyComplete();
}

代码示例来源:origin: spring-projects/spring-framework

/**
 * Test a {@link Encoder#encode encode} scenario where the input stream contains an error.
 * This test method will feed the first element of the {@code input} stream to the encoder,
 * followed by an {@link InputException}.
 * The result is expected to contain one "normal" element, followed by the error.
 *
 * @param input the input to be provided to the encoder
 * @param inputType the input type
 * @param mimeType the mime type to use for decoding. May be {@code null}.
 * @param hints the hints used for decoding. May be {@code null}.
 * @see InputException
 */
protected void testEncodeError(Publisher<?> input, ResolvableType inputType,
    @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
  input = Flux.concat(
      Flux.from(input).take(1),
      Flux.error(new InputException()));
  Flux<DataBuffer> result = encoder().encode(input, this.bufferFactory, inputType,
      mimeType, hints);
  StepVerifier.create(result)
      .consumeNextWith(DataBufferUtils::release)
      .expectError(InputException.class)
      .verify();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void verifyVirtualTimeNoEventInterval() {
  StepVerifier.withVirtualTime(() -> Flux.interval(Duration.ofSeconds(3))
                      .take(2))
        .expectSubscription()
        .expectNoEvent(Duration.ofSeconds(3))
        .expectNext(0L)
        .expectNoEvent(Duration.ofSeconds(3))
        .expectNext(1L)
        .expectComplete()
        .verify();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void contextTest() {
  StepVerifier.create(Flux.generate(s -> s.next(s.currentContext()
                          .get(AtomicInteger.class)
                          .incrementAndGet()))
              .take(10)
              .subscriberContext(ctx -> ctx.put(AtomicInteger.class,
                  new AtomicInteger())))
        .expectNext(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void onSubscribeRaceRequestingShouldBeConsistentForTakeConditionalTest() throws InterruptedException {
  for (int i = 0; i < 5; i++) {
    int take = 3000;
    RaceSubscriber<Integer> actual = new RaceSubscriber<>(take);
    Flux.range(0, Integer.MAX_VALUE)
      .take(take)
      .filter(e -> true)
      .subscribe(actual);
    actual.await(5, TimeUnit.SECONDS);
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
public void innerCancellationCancelsMainSequence() {
  StepVerifier.create(Flux.just("red", "green", "#", "black", "white")
              .log()
              .windowWhile(s -> !s.equals("#"))
              .flatMap(w -> w.take(1)))
        .expectNext("red")
        .thenCancel()
        .verify();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void takeZero() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create(0);
  Flux.range(1, 10)
    .take(0)
    .subscribe(ts);
  ts.assertNoValues()
   .assertComplete()
   .assertNoError();
}

相关文章

微信公众号

最新文章

更多

Flux类方法