[英]Take only the first N values from this Flux, if available.

If N is zero, the resulting Flux completes as soon as this Fluxsignals its first value (which is not not relayed, though).

Note that this operator doesn't manipulate the backpressure requested amount. Rather, it merely lets requests from downstream propagate as is and cancels once N elements have been emitted. As a result, the source could produce a lot of extraneous elements in the meantime. If that behavior is undesirable and you do not own the request from downstream (e.g. prefetching operators), consider using #limitRequest(long) instead.


代码示例来源:origin: spring-projects/spring-framework

public final Flux<DataBuffer> encode(Publisher<? extends T> inputStream, DataBufferFactory bufferFactory,
    ResolvableType elementType, @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
  return Flux.from(inputStream).
      concatMap(t -> encode(t, bufferFactory, elementType, mimeType, hints));

代码示例来源:origin: spring-projects/spring-framework

@GetMapping(path = "/spr16869", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
  Flux<String> sseFlux() {
    return Flux.interval(Duration.ofSeconds(1)).take(3)
        .map(aLong -> String.format("event%d", aLong));

代码示例来源:origin: org.springframework/spring-core

public final Flux<DataBuffer> encode(Publisher<? extends T> inputStream, DataBufferFactory bufferFactory,
    ResolvableType elementType, @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
  return Flux.from(inputStream).
      concatMap(t -> encode(t, bufferFactory, elementType, mimeType, hints));

代码示例来源:origin: spring-projects/spring-framework

@GetMapping(produces = "text/event-stream")
Flux<Person> getPersonStream() {
  return Flux.interval(ofMillis(100)).take(50).onBackpressureBuffer(50)
      .map(index -> new Person("N" + index));

代码示例来源:origin: spring-projects/spring-framework

 * Return an interval stream of N number of ticks and buffer the emissions
 * to avoid back pressure failures (e.g. on slow CI server).
 * <p>Use this method as follows:
 * <ul>
 * <li>Tests that verify N number of items followed by verifyOnComplete()
 * should set the number of emissions to N.
 * <li>Tests that verify N number of items followed by thenCancel() should
 * set the number of buffered to an arbitrary number greater than N.
 * </ul>
public static Flux<Long> testInterval(Duration period, int count) {
  return Flux.interval(period).take(count).onBackpressureBuffer(count);

代码示例来源:origin: spring-projects/spring-framework

Flux<ServerSentEvent<Person>> sse() {
  return INTERVAL.take(2).map(l ->
      ServerSentEvent.builder(new Person("foo " + l))
          .comment("bar " + l)

代码示例来源:origin: spring-projects/spring-framework

public void echo() throws Exception {
  int count = 100;
  Flux<String> input = Flux.range(1, count).map(index -> "msg-" + index);
  ReplayProcessor<Object> output = ReplayProcessor.create(count);
  this.client.execute(getUrl("/echo"), session -> session
  assertEquals(input.collectList().block(TIMEOUT), output.collectList().block(TIMEOUT));

代码示例来源:origin: spring-projects/spring-framework

protected void testDecodeError(Publisher<DataBuffer> input, ResolvableType outputType,
    @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
  input = Flux.concat(
      Flux.error(new InputException()));
  Flux<String> result = this.decoder.decode(input, outputType, mimeType, hints);

代码示例来源:origin: spring-projects/spring-framework

parts = (adapter.isMultiValue() ? parts : parts.take(1));
return Mono.just(adapter.fromPublisher(parts));

代码示例来源:origin: spring-projects/spring-framework

protected void testDecodeError(Publisher<DataBuffer> input, ResolvableType outputType,
    @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
  input = Flux.concat(
      Flux.error(new InputException()));
  Flux<Resource> result = this.decoder.decode(input, outputType, mimeType, hints);

代码示例来源:origin: spring-projects/spring-framework

public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
  String path = request.getURI().getPath();
  switch (path) {
    case "/write-and-flush":
      return response.writeAndFlushWith(
          testInterval(Duration.ofMillis(50), 2)
              .map(longValue -> wrap("data" + longValue + "\n", response))
    case "/write-and-complete":
      return response.writeWith(
          chunks1K().take(64).map(s -> wrap(s, response)));
    case "/write-and-never-complete":
      // Reactor requires at least 50 to flush, Tomcat/Undertow 8, Jetty 1
      return response.writeWith(
          chunks1K().take(64).map(s -> wrap(s, response)).mergeWith(Flux.never()));
      return response.writeWith(Flux.empty());

代码示例来源:origin: spring-projects/spring-framework

 * Test a {@link Decoder#decodeToMono decode} scenario where the input stream contains an error.
 * This test method will feed the first element of the {@code input} stream to the decoder,
 * followed by an {@link InputException}.
 * The result is expected to contain the error.
 * @param input the input to be provided to the decoder
 * @param outputType the desired output type
 * @param mimeType the mime type to use for decoding. May be {@code null}.
 * @param hints the hints used for decoding. May be {@code null}.
 * @see InputException
protected void testDecodeToMonoError(Publisher<DataBuffer> input, ResolvableType outputType,
    @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
  input = Flux.concat(
      Flux.error(new InputException()));
  Mono<?> result = this.decoder.decodeToMono(input, outputType, mimeType, hints);

代码示例来源:origin: spring-projects/spring-framework

 * Test a {@link Decoder#decode decode} scenario where the input stream contains an error.
 * This test method will feed the first element of the {@code input} stream to the decoder,
 * followed by an {@link InputException}.
 * The result is expected to contain one "normal" element, followed by the error.
 * @param input the input to be provided to the decoder
 * @param outputType the desired output type
 * @param mimeType the mime type to use for decoding. May be {@code null}.
 * @param hints the hints used for decoding. May be {@code null}.
 * @see InputException
protected void testDecodeError(Publisher<DataBuffer> input, ResolvableType outputType,
    @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
  input = Flux.concat(
      Flux.error(new InputException()));
  Flux<?> result = this.decoder.decode(input, outputType, mimeType, hints);

代码示例来源:origin: reactor/reactor-core

public void contextTest() {
  StepVerifier.create(Flux.create(s -> IntStream.range(0, 10).forEach(i ->
              .subscriberContext(ctx -> ctx.put(AtomicInteger.class,
                  new AtomicInteger())))
        .expectNext(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

代码示例来源:origin: spring-projects/spring-framework

 * Test a {@link Encoder#encode encode} scenario where the input stream contains an error.
 * This test method will feed the first element of the {@code input} stream to the encoder,
 * followed by an {@link InputException}.
 * The result is expected to contain one "normal" element, followed by the error.
 * @param input the input to be provided to the encoder
 * @param inputType the input type
 * @param mimeType the mime type to use for decoding. May be {@code null}.
 * @param hints the hints used for decoding. May be {@code null}.
 * @see InputException
protected void testEncodeError(Publisher<?> input, ResolvableType inputType,
    @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
  input = Flux.concat(
      Flux.error(new InputException()));
  Flux<DataBuffer> result = encoder().encode(input, this.bufferFactory, inputType,
      mimeType, hints);

代码示例来源:origin: reactor/reactor-core

public void verifyVirtualTimeNoEventInterval() {
  StepVerifier.withVirtualTime(() -> Flux.interval(Duration.ofSeconds(3))

代码示例来源:origin: reactor/reactor-core

public void contextTest() {
  StepVerifier.create(Flux.generate(s ->
              .subscriberContext(ctx -> ctx.put(AtomicInteger.class,
                  new AtomicInteger())))
        .expectNext(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

代码示例来源:origin: reactor/reactor-core

public void onSubscribeRaceRequestingShouldBeConsistentForTakeConditionalTest() throws InterruptedException {
  for (int i = 0; i < 5; i++) {
    int take = 3000;
    RaceSubscriber<Integer> actual = new RaceSubscriber<>(take);
    Flux.range(0, Integer.MAX_VALUE)
      .filter(e -> true)
    actual.await(5, TimeUnit.SECONDS);

代码示例来源:origin: reactor/reactor-core

public void innerCancellationCancelsMainSequence() {
  StepVerifier.create(Flux.just("red", "green", "#", "black", "white")
              .windowWhile(s -> !s.equals("#"))
              .flatMap(w -> w.take(1)))

代码示例来源:origin: reactor/reactor-core

public void takeZero() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create(0);
  Flux.range(1, 10)




