
x33g5p2x  于2022-01-19 转载在 其他  



[英]Collect incoming values into multiple List buffers that will be emitted by the returned Flux each time the buffer reaches a maximum size OR the maxTime Duration elapses.
[中]将传入值收集到多个列表缓冲区中,每当缓冲区达到最大大小或maxTime Duration过期时,返回的流量将发出这些值。


代码示例来源:origin: reactor/reactor-core

 * Collect incoming values into multiple {@link List} buffers that will be emitted
 * by the returned {@link Flux} each time the buffer reaches a maximum size OR the
 * maxTime {@link Duration} elapses.
 * <p>
 * <img class="marble" src="doc-files/marbles/bufferTimeoutWithMaxSizeAndTimespan.svg" alt="">
 * @reactor.discard This operator discards the currently open buffer upon cancellation or error triggered by a data signal.
 * @param maxSize the max collected size
 * @param maxTime the timeout enforcing the release of a partial buffer
 * @return a microbatched {@link Flux} of {@link List} delimited by given size or a given period timeout
public final Flux<List<T>> bufferTimeout(int maxSize, Duration maxTime) {
  return bufferTimeout(maxSize, maxTime, listSupplier());

代码示例来源:origin: reactor/reactor-core

 * Collect incoming values into multiple user-defined {@link Collection} buffers that
 * will be emitted by the returned {@link Flux} each time the buffer reaches a maximum
 * size OR the maxTime {@link Duration} elapses.
 * <p>
 * <img class="marble" src="doc-files/marbles/bufferTimeoutWithMaxSizeAndTimespan.svg" alt="">
 * @reactor.discard This operator discards the currently open buffer upon cancellation or error triggered by a data signal.
 * @param maxSize the max collected size
 * @param maxTime the timeout enforcing the release of a partial buffer
 * @param bufferSupplier a {@link Supplier} of the concrete {@link Collection} to use for each buffer
 * @param <C> the {@link Collection} buffer type
 * @return a microbatched {@link Flux} of {@link Collection} delimited by given size or a given period timeout
public final <C extends Collection<? super T>> Flux<C> bufferTimeout(int maxSize, Duration maxTime, Supplier<C> bufferSupplier) {
  return bufferTimeout(maxSize, maxTime, Schedulers.parallel(),

代码示例来源:origin: reactor/reactor-core

 * Collect incoming values into multiple {@link List} buffers that will be emitted
 * by the returned {@link Flux} each time the buffer reaches a maximum size OR the
 * maxTime {@link Duration} elapses, as measured on the provided {@link Scheduler}.
 * <p>
 * <img class="marble" src="doc-files/marbles/bufferTimeoutWithMaxSizeAndTimespan.svg" alt="">
 * @reactor.discard This operator discards the currently open buffer upon cancellation or error triggered by a data signal.
 * @param maxSize the max collected size
 * @param maxTime the timeout enforcing the release of a partial buffer
 * @param timer a time-capable {@link Scheduler} instance to run on
 * @return a microbatched {@link Flux} of {@link List} delimited by given size or a given period timeout
public final Flux<List<T>> bufferTimeout(int maxSize, Duration maxTime, Scheduler timer) {
  return bufferTimeout(maxSize, maxTime, timer, listSupplier());

代码示例来源:origin: reactor/reactor-core

Flux<List<Integer>> scenario_bufferWithTimeoutAccumulateOnTimeOrSize() {
  return Flux.range(1, 6)
        .bufferTimeout(5, Duration.ofMillis(2000));

代码示例来源:origin: reactor/reactor-core

Flux<List<Integer>> scenario_bufferWithTimeoutAccumulateOnTimeOrSize2() {
  return Flux.range(1, 6)
        .bufferTimeout(5, Duration.ofMillis(2000));

代码示例来源:origin: reactor/reactor-core

Flux<List<Integer>> scenario_bufferWithTimeoutThrowingExceptionOnTimeOrSizeIfDownstreamDemandIsLow() {
  return Flux.range(1, 6)
        .bufferTimeout(5, Duration.ofMillis(100));

代码示例来源:origin: reactor/reactor-core

public void scanOperator() {
  final Flux<List<Integer>> flux = Flux.just(1).bufferTimeout(3, Duration.ofSeconds(1));
  assertThat(((Scannable) flux).scan(Scannable.Attr.RUN_ON)).isSameAs(Schedulers.parallel());

代码示例来源:origin: reactor/reactor-core

.subscribe(stream -> stream.publishOn(asyncGroup)
             .bufferTimeout(1000 / 8, Duration.ofSeconds(1))
             .subscribe(batch -> {
               for (int j = 0; j < batch.size(); j++) {

代码示例来源:origin: reactor/reactor-core

.subscribe(substream -> substream.hide().publishOn(asyncGroup)
               .bufferTimeout(BATCH_SIZE, Duration.ofMillis(TIMEOUT))
               .subscribe(items -> {

代码示例来源:origin: reactor/reactor-core

public void rejectedOnNextLeadsToOnError() {
  Scheduler scheduler = Schedulers.newSingle("rejectedOnNextLeadsToOnError");
  StepVerifier.create(Flux.just(1, 2, 3)
              .bufferTimeout(4, Duration.ofMillis(500), scheduler))

代码示例来源:origin: reactor/reactor-core

public void discardOnTimerRejected() {
  Scheduler scheduler = Schedulers.newSingle("discardOnTimerRejected");
  StepVerifier.create(Flux.just(1, 2, 3)
              .doOnNext(n -> scheduler.dispose())
              .bufferTimeout(10, Duration.ofMillis(100), scheduler))
        .expectErrorSatisfies(e -> assertThat(e).isInstanceOf(RejectedExecutionException.class))

代码示例来源:origin: reactor/reactor-core

  public void discardOnError() {
    StepVerifier.create(Flux.just(1, 2, 3)
                .concatWith(Mono.error(new IllegalStateException("boom")))
                .bufferTimeout(10, Duration.ofMillis(100)))
          .hasDiscardedExactly(1, 2, 3);

代码示例来源:origin: reactor/reactor-core

public void discardOnCancel() {
  StepVerifier.create(Flux.just(1, 2, 3)
              .bufferTimeout(10, Duration.ofMillis(100)))
        .hasDiscardedExactly(1, 2, 3);

代码示例来源:origin: reactor/reactor-core

public void shouldCorrectlyDispatchBatchedTimeout() throws InterruptedException {
  long timeout = 100;
  final int batchsize = 4;
  int parallelStreams = 16;
  CountDownLatch latch = new CountDownLatch(1);
  final EmitterProcessor<Integer> streamBatcher = EmitterProcessor.create();
         .bufferTimeout(batchsize, Duration.ofSeconds(timeout))
         .subscribe(innerStream -> innerStream.publishOn(asyncGroup)
                          .subscribe(i -> latch.countDown()));
  boolean finished = latch.await(2, TimeUnit.SECONDS);
  if (!finished) {
    throw new RuntimeException(latch.getCount()+"");
  else {
    assertEquals("Must have correct latch number : " + latch.getCount(), latch.getCount(), 0);

代码示例来源:origin: reactor/reactor-core

public void discardOnFlushWithoutRequest() {
  TestPublisher<Integer> testPublisher = TestPublisher.createNoncompliant(TestPublisher.Violation.REQUEST_OVERFLOW);
          .bufferTimeout(10, Duration.ofMillis(200)),
        .then(() -> testPublisher.emit(1, 2, 3))
        .hasDiscardedExactly(1, 2, 3);

代码示例来源:origin: reactor/reactor-core

  public Processor<Long, Long> createIdentityProcessor(int bufferSize) {
    Flux<String> otherStream = Flux.just("test", "test2", "test3");
//        System.out.println("Providing new downstream");
    FluxProcessor<Long, Long> p =


    BiFunction<Long, String, Long> combinator = (t1, t2) -> t1;
    return FluxProcessor.wrap(p,
        p.groupBy(k -> k % 2 == 0)
         .flatMap(stream -> stream.scan((prev, next) -> next)
                     .map(integer -> -integer)
                     .filter(integer -> integer <= 0)
                     .map(integer -> -integer)
                     .bufferTimeout(1024, Duration.ofMillis(50))
                     .doOnNext(array -> cumulated.getAndIncrement())
                     .flatMap(i ->,
         .doOnNext(array -> cumulatedJoin.getAndIncrement())

代码示例来源:origin: reactor/reactor-core

  Flux<Integer> transformFlux(Flux<Integer> f) {
    Flux<String> otherStream = Flux.just("test", "test2", "test3");
//        System.out.println("Providing new downstream");

    Scheduler asyncGroup = Schedulers.newParallel("flux-p-tck", 2);

    BiFunction<Integer, String, Integer> combinator = (t1, t2) -> t1;

    return f.publishOn(sharedGroup)
        .flatMap(stream -> stream.publishOn(asyncGroup)
                     .scan((prev, next) -> next)
                     .map(integer -> -integer)
                     .filter(integer -> integer <= 0)
                     .map(integer -> -integer)
                     .bufferTimeout(batch, Duration.ofMillis(50))
                     .flatMap(i ->, otherStream, combinator))

代码示例来源:origin: io.projectreactor/reactor-core

 * Collect incoming values into multiple {@link List} buffers that will be emitted
 * by the returned {@link Flux} each time the buffer reaches a maximum size OR the
 * maxTime {@link Duration} elapses.
 * <p>
 * <img class="marble" src="doc-files/marbles/bufferTimeoutWithMaxSizeAndTimespan.svg" alt="">
 * @reactor.discard This operator discards the currently open buffer upon cancellation or error triggered by a data signal.
 * @param maxSize the max collected size
 * @param maxTime the timeout enforcing the release of a partial buffer
 * @return a microbatched {@link Flux} of {@link List} delimited by given size or a given period timeout
public final Flux<List<T>> bufferTimeout(int maxSize, Duration maxTime) {
  return bufferTimeout(maxSize, maxTime, listSupplier());

代码示例来源:origin: io.projectreactor/reactor-core

 * Collect incoming values into multiple {@link List} buffers that will be emitted
 * by the returned {@link Flux} each time the buffer reaches a maximum size OR the
 * maxTime {@link Duration} elapses, as measured on the provided {@link Scheduler}.
 * <p>
 * <img class="marble" src="doc-files/marbles/bufferTimeoutWithMaxSizeAndTimespan.svg" alt="">
 * @reactor.discard This operator discards the currently open buffer upon cancellation or error triggered by a data signal.
 * @param maxSize the max collected size
 * @param maxTime the timeout enforcing the release of a partial buffer
 * @param timer a time-capable {@link Scheduler} instance to run on
 * @return a microbatched {@link Flux} of {@link List} delimited by given size or a given period timeout
public final Flux<List<T>> bufferTimeout(int maxSize, Duration maxTime, Scheduler timer) {
  return bufferTimeout(maxSize, maxTime, timer, listSupplier());

代码示例来源:origin: io.projectreactor/reactor-core

 * Collect incoming values into multiple user-defined {@link Collection} buffers that
 * will be emitted by the returned {@link Flux} each time the buffer reaches a maximum
 * size OR the maxTime {@link Duration} elapses.
 * <p>
 * <img class="marble" src="doc-files/marbles/bufferTimeoutWithMaxSizeAndTimespan.svg" alt="">
 * @reactor.discard This operator discards the currently open buffer upon cancellation or error triggered by a data signal.
 * @param maxSize the max collected size
 * @param maxTime the timeout enforcing the release of a partial buffer
 * @param bufferSupplier a {@link Supplier} of the concrete {@link Collection} to use for each buffer
 * @param <C> the {@link Collection} buffer type
 * @return a microbatched {@link Flux} of {@link Collection} delimited by given size or a given period timeout
public final <C extends Collection<? super T>> Flux<C> bufferTimeout(int maxSize, Duration maxTime, Supplier<C> bufferSupplier) {
  return bufferTimeout(maxSize, maxTime, Schedulers.parallel(),




