reactor.core.publisher.Flux.collectSortedList()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(8.2k)|赞(0)|评价(0)|浏览(203)

本文整理了Java中reactor.core.publisher.Flux.collectSortedList()方法的一些代码示例,展示了Flux.collectSortedList()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.collectSortedList()方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:collectSortedList

Flux.collectSortedList介绍

[英]Collect all elements emitted by this Flux until this sequence completes, and then sort them in natural order into a List that is emitted by the resulting Mono.
[中]收集此通量发出的所有元素,直到此序列完成,然后按自然顺序将它们排序到由结果Mono发出的列表中。

代码示例

代码示例来源:origin: reactor/reactor-core

/**
 * Collect all elements emitted by this {@link Flux} until this sequence completes,
 * and then sort them in natural order into a {@link List} that is emitted by the
 * resulting {@link Mono}.
 *
 * <p>
 * <img class="marble" src="doc-files/marbles/collectSortedList.svg" alt="">
 *
 * @return a {@link Mono} of a sorted {@link List} of all values from this {@link Flux}, in natural order
 */
public final Mono<List<T>> collectSortedList() {
  return collectSortedList(null);
}

代码示例来源:origin: reactor/reactor-core

/**
 * Sort elements from this {@link Flux} using a {@link Comparator} function, by
 * collecting and sorting elements in the background then emitting the sorted sequence
 * once this sequence completes.
 *
 * <p>Note that calling {@code sort} with long, non-terminating or infinite sources
 * might cause {@link OutOfMemoryError}
 *
 * @param sortFunction a function that compares two items emitted by this {@link Flux}
 * to indicate their sort order
 * @return a sorted {@link Flux}
 */
public final Flux<T> sort(Comparator<? super T> sortFunction) {
  return collectSortedList(sortFunction).flatMapIterable(identityFunction());
}

代码示例来源:origin: reactor/reactor-core

/**
 * Sort elements from this {@link Flux} by collecting and sorting them in the background
 * then emitting the sorted sequence once this sequence completes.
 * Each item emitted by the {@link Flux} must implement {@link Comparable} with
 * respect to all other items in the sequence.
 *
 * <p>Note that calling {@code sort} with long, non-terminating or infinite sources
 * might cause {@link OutOfMemoryError}. Use sequence splitting like {@link #window} to sort batches in that case.
 *
 * @throws ClassCastException if any item emitted by the {@link Flux} does not implement
 * {@link Comparable} with respect to all other items emitted by the {@link Flux}
 * @return a sorted {@link Flux}
 */
public final Flux<T> sort(){
  return collectSortedList().flatMapIterable(identityFunction());
}

代码示例来源:origin: reactor/reactor-core

@Test
public void aFluxCanBeSorted(){
  List<Integer> vals = Flux.just(43, 32122, 422, 321, 43, 443311)
               .collectSortedList()
               .block();
  assertThat(vals).containsExactly(43, 43, 321, 422, 32122, 443311);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void aFluxCanBeSorted2(){
  List<Integer> vals = Flux.just(1, 2, 3, 4)
               .collectSortedList(Comparator.reverseOrder())
               .block();
  assertThat(vals).containsExactly(4,3,2,1);
}

代码示例来源:origin: reactor/reactor-core

@Test(timeout = 1000)
public void gh841_workaroundFlux() {
  Flux<String> source = Flux.<String>create(sink -> {
    sink.next("a");
    sink.next("b");
    sink.complete();
  })
      .collectSortedList((a, b) -> { throw new IllegalStateException("boom"); })
      .hide()
      .flatMapIterable(Function.identity());
  StepVerifier.create(source)
        .expectErrorSatisfies(e -> assertThat(e).isInstanceOf(IllegalStateException.class)
        .hasMessage("boom"))
        .verify();
}

代码示例来源:origin: reactor/reactor-core

@Test(timeout = 1000)
  public void gh841_workaroundStream() {
    Flux<String> source = Flux.<String>create(sink -> {
      sink.next("a");
      sink.next("b");
      sink.complete();
    })
        .collectSortedList((a, b) -> { throw new IllegalStateException("boom"); })
        .hide()
        .flatMapIterable(Function.identity());

    assertThatExceptionOfType(IllegalStateException.class)
        .isThrownBy(() -> source.toStream()
                    .collect(Collectors.toSet()))
        .withMessage("boom");
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
public void sampleTest() throws Exception {
  CountDownLatch latch = new CountDownLatch(1);
  Disposable top10every1second =
   Flux.fromIterable(PULP_SAMPLE)
       .publishOn(asyncGroup)
       .flatMap(samuelJackson ->
      Flux
       .fromArray(samuelJackson.split(" "))
       .publishOn(asyncGroup)
       .filter(w -> !w.trim().isEmpty())
       .doOnNext(i -> simulateLatency())
    )
       .window(Duration.ofSeconds(2))
       .flatMap(s -> s.groupBy(w -> w)
              .flatMap(w -> w.count().map(c -> Tuples.of(w.key(), c)))
              .collectSortedList((a, b) -> -a.getT2().compareTo(b.getT2()))
              .flatMapMany(Flux::fromIterable)
              .take(10)
              .doAfterTerminate(() -> LOG.info("------------------------ window terminated" +
             "----------------------"))
    )
       .subscribe(
     entry -> LOG.info(entry.getT1() + ": " + entry.getT2()),
     error -> LOG.error("", error),
          latch::countDown
    );
  awaitLatch(top10every1second, latch);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void analyticsTest() throws Exception {
  ReplayProcessor<Integer> source = ReplayProcessor.create();
  long avgTime = 50l;
  Mono<Long> result = source
      .log("delay")
      .publishOn(asyncGroup)
               .delayElements(Duration.ofMillis(avgTime))
               .elapsed()
               .skip(1)
               .groupBy(w -> w.getT1())
               .flatMap(w -> w.count().map(c -> Tuples.of(w.key(), c)))
               .log("elapsed")
               .collectSortedList(Comparator.comparing(Tuple2::getT1))
               .flatMapMany(Flux::fromIterable)
               .reduce(-1L, (acc, next) -> acc > 0l ? ((next.getT1() + acc) / 2) : next.getT1())
               .log("reduced-elapsed")
               .cache();
  source.subscribe();
  for (int j = 0; j < 10; j++) {
    source.onNext(1);
  }
  source.onComplete();
  Assert.assertTrue(result.block(Duration.ofSeconds(5)) >= avgTime * 0.6);
}

代码示例来源:origin: com.aol.cyclops/cyclops-reactor

/**
 * @return
 * @see reactor.core.publisher.Flux#collectSortedList()
 */
public final Mono<List<T>> collectSortedList() {
  return boxed.collectSortedList();
}
/**

代码示例来源:origin: com.aol.cyclops/cyclops-reactor

/**
 * @param comparator
 * @return
 * @see reactor.core.publisher.Flux#collectSortedList(java.util.Comparator)
 */
public final Mono<List<T>> collectSortedList(Comparator<? super T> comparator) {
  return boxed.collectSortedList(comparator);
}
/**

代码示例来源:origin: io.projectreactor/reactor-core

/**
 * Collect all elements emitted by this {@link Flux} until this sequence completes,
 * and then sort them in natural order into a {@link List} that is emitted by the
 * resulting {@link Mono}.
 *
 * <p>
 * <img class="marble" src="doc-files/marbles/collectSortedList.svg" alt="">
 *
 * @return a {@link Mono} of a sorted {@link List} of all values from this {@link Flux}, in natural order
 */
public final Mono<List<T>> collectSortedList() {
  return collectSortedList(null);
}

代码示例来源:origin: io.projectreactor/reactor-core

/**
 * Sort elements from this {@link Flux} using a {@link Comparator} function, by
 * collecting and sorting elements in the background then emitting the sorted sequence
 * once this sequence completes.
 *
 * <p>Note that calling {@code sort} with long, non-terminating or infinite sources
 * might cause {@link OutOfMemoryError}
 *
 * @param sortFunction a function that compares two items emitted by this {@link Flux}
 * to indicate their sort order
 * @return a sorted {@link Flux}
 */
public final Flux<T> sort(Comparator<? super T> sortFunction) {
  return collectSortedList(sortFunction).flatMapIterable(identityFunction());
}

代码示例来源:origin: io.projectreactor/reactor-core

/**
 * Sort elements from this {@link Flux} by collecting and sorting them in the background
 * then emitting the sorted sequence once this sequence completes.
 * Each item emitted by the {@link Flux} must implement {@link Comparable} with
 * respect to all other items in the sequence.
 *
 * <p>Note that calling {@code sort} with long, non-terminating or infinite sources
 * might cause {@link OutOfMemoryError}. Use sequence splitting like {@link #window} to sort batches in that case.
 *
 * @throws ClassCastException if any item emitted by the {@link Flux} does not implement
 * {@link Comparable} with respect to all other items emitted by the {@link Flux}
 * @return a sorted {@link Flux}
 */
public final Flux<T> sort(){
  return collectSortedList().flatMapIterable(identityFunction());
}

代码示例来源:origin: org.cloudfoundry/cloudfoundry-operations

private static Flux<LogMessage> getLogs(Mono<DopplerClient> dopplerClient, String applicationId, Boolean recent) {
  if (Optional.ofNullable(recent).orElse(false)) {
    return requestLogsRecent(dopplerClient, applicationId)
      .filter(e -> EventType.LOG_MESSAGE == e.getEventType())
      .map(Envelope::getLogMessage)
      .collectSortedList(LOG_MESSAGE_COMPARATOR)
      .flatMapIterable(d -> d);
  } else {
    return requestLogsStream(dopplerClient, applicationId)
      .filter(e -> EventType.LOG_MESSAGE == e.getEventType())
      .map(Envelope::getLogMessage)
      .compose(SortingUtils.timespan(LOG_MESSAGE_COMPARATOR, LOG_MESSAGE_TIMESPAN));
  }
}

相关文章

微信公众号

最新文章

更多

Flux类方法