本文整理了Java中reactor.core.publisher.Flux.collectSortedList()
方法的一些代码示例,展示了Flux.collectSortedList()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.collectSortedList()
方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:collectSortedList
[英]Collect all elements emitted by this Flux until this sequence completes, and then sort them in natural order into a List that is emitted by the resulting Mono.
[中]收集此通量发出的所有元素,直到此序列完成,然后按自然顺序将它们排序到由结果Mono发出的列表中。
代码示例来源:origin: reactor/reactor-core
/**
* Collect all elements emitted by this {@link Flux} until this sequence completes,
* and then sort them in natural order into a {@link List} that is emitted by the
* resulting {@link Mono}.
*
* <p>
* <img class="marble" src="doc-files/marbles/collectSortedList.svg" alt="">
*
* @return a {@link Mono} of a sorted {@link List} of all values from this {@link Flux}, in natural order
*/
public final Mono<List<T>> collectSortedList() {
return collectSortedList(null);
}
代码示例来源:origin: reactor/reactor-core
/**
* Sort elements from this {@link Flux} using a {@link Comparator} function, by
* collecting and sorting elements in the background then emitting the sorted sequence
* once this sequence completes.
*
* <p>Note that calling {@code sort} with long, non-terminating or infinite sources
* might cause {@link OutOfMemoryError}
*
* @param sortFunction a function that compares two items emitted by this {@link Flux}
* to indicate their sort order
* @return a sorted {@link Flux}
*/
public final Flux<T> sort(Comparator<? super T> sortFunction) {
return collectSortedList(sortFunction).flatMapIterable(identityFunction());
}
代码示例来源:origin: reactor/reactor-core
/**
* Sort elements from this {@link Flux} by collecting and sorting them in the background
* then emitting the sorted sequence once this sequence completes.
* Each item emitted by the {@link Flux} must implement {@link Comparable} with
* respect to all other items in the sequence.
*
* <p>Note that calling {@code sort} with long, non-terminating or infinite sources
* might cause {@link OutOfMemoryError}. Use sequence splitting like {@link #window} to sort batches in that case.
*
* @throws ClassCastException if any item emitted by the {@link Flux} does not implement
* {@link Comparable} with respect to all other items emitted by the {@link Flux}
* @return a sorted {@link Flux}
*/
public final Flux<T> sort(){
return collectSortedList().flatMapIterable(identityFunction());
}
代码示例来源:origin: reactor/reactor-core
@Test
public void aFluxCanBeSorted(){
List<Integer> vals = Flux.just(43, 32122, 422, 321, 43, 443311)
.collectSortedList()
.block();
assertThat(vals).containsExactly(43, 43, 321, 422, 32122, 443311);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void aFluxCanBeSorted2(){
List<Integer> vals = Flux.just(1, 2, 3, 4)
.collectSortedList(Comparator.reverseOrder())
.block();
assertThat(vals).containsExactly(4,3,2,1);
}
代码示例来源:origin: reactor/reactor-core
@Test(timeout = 1000)
public void gh841_workaroundFlux() {
Flux<String> source = Flux.<String>create(sink -> {
sink.next("a");
sink.next("b");
sink.complete();
})
.collectSortedList((a, b) -> { throw new IllegalStateException("boom"); })
.hide()
.flatMapIterable(Function.identity());
StepVerifier.create(source)
.expectErrorSatisfies(e -> assertThat(e).isInstanceOf(IllegalStateException.class)
.hasMessage("boom"))
.verify();
}
代码示例来源:origin: reactor/reactor-core
@Test(timeout = 1000)
public void gh841_workaroundStream() {
Flux<String> source = Flux.<String>create(sink -> {
sink.next("a");
sink.next("b");
sink.complete();
})
.collectSortedList((a, b) -> { throw new IllegalStateException("boom"); })
.hide()
.flatMapIterable(Function.identity());
assertThatExceptionOfType(IllegalStateException.class)
.isThrownBy(() -> source.toStream()
.collect(Collectors.toSet()))
.withMessage("boom");
}
}
代码示例来源:origin: reactor/reactor-core
@Test
public void sampleTest() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
Disposable top10every1second =
Flux.fromIterable(PULP_SAMPLE)
.publishOn(asyncGroup)
.flatMap(samuelJackson ->
Flux
.fromArray(samuelJackson.split(" "))
.publishOn(asyncGroup)
.filter(w -> !w.trim().isEmpty())
.doOnNext(i -> simulateLatency())
)
.window(Duration.ofSeconds(2))
.flatMap(s -> s.groupBy(w -> w)
.flatMap(w -> w.count().map(c -> Tuples.of(w.key(), c)))
.collectSortedList((a, b) -> -a.getT2().compareTo(b.getT2()))
.flatMapMany(Flux::fromIterable)
.take(10)
.doAfterTerminate(() -> LOG.info("------------------------ window terminated" +
"----------------------"))
)
.subscribe(
entry -> LOG.info(entry.getT1() + ": " + entry.getT2()),
error -> LOG.error("", error),
latch::countDown
);
awaitLatch(top10every1second, latch);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void analyticsTest() throws Exception {
ReplayProcessor<Integer> source = ReplayProcessor.create();
long avgTime = 50l;
Mono<Long> result = source
.log("delay")
.publishOn(asyncGroup)
.delayElements(Duration.ofMillis(avgTime))
.elapsed()
.skip(1)
.groupBy(w -> w.getT1())
.flatMap(w -> w.count().map(c -> Tuples.of(w.key(), c)))
.log("elapsed")
.collectSortedList(Comparator.comparing(Tuple2::getT1))
.flatMapMany(Flux::fromIterable)
.reduce(-1L, (acc, next) -> acc > 0l ? ((next.getT1() + acc) / 2) : next.getT1())
.log("reduced-elapsed")
.cache();
source.subscribe();
for (int j = 0; j < 10; j++) {
source.onNext(1);
}
source.onComplete();
Assert.assertTrue(result.block(Duration.ofSeconds(5)) >= avgTime * 0.6);
}
代码示例来源:origin: com.aol.cyclops/cyclops-reactor
/**
* @return
* @see reactor.core.publisher.Flux#collectSortedList()
*/
public final Mono<List<T>> collectSortedList() {
return boxed.collectSortedList();
}
/**
代码示例来源:origin: com.aol.cyclops/cyclops-reactor
/**
* @param comparator
* @return
* @see reactor.core.publisher.Flux#collectSortedList(java.util.Comparator)
*/
public final Mono<List<T>> collectSortedList(Comparator<? super T> comparator) {
return boxed.collectSortedList(comparator);
}
/**
代码示例来源:origin: io.projectreactor/reactor-core
/**
* Collect all elements emitted by this {@link Flux} until this sequence completes,
* and then sort them in natural order into a {@link List} that is emitted by the
* resulting {@link Mono}.
*
* <p>
* <img class="marble" src="doc-files/marbles/collectSortedList.svg" alt="">
*
* @return a {@link Mono} of a sorted {@link List} of all values from this {@link Flux}, in natural order
*/
public final Mono<List<T>> collectSortedList() {
return collectSortedList(null);
}
代码示例来源:origin: io.projectreactor/reactor-core
/**
* Sort elements from this {@link Flux} using a {@link Comparator} function, by
* collecting and sorting elements in the background then emitting the sorted sequence
* once this sequence completes.
*
* <p>Note that calling {@code sort} with long, non-terminating or infinite sources
* might cause {@link OutOfMemoryError}
*
* @param sortFunction a function that compares two items emitted by this {@link Flux}
* to indicate their sort order
* @return a sorted {@link Flux}
*/
public final Flux<T> sort(Comparator<? super T> sortFunction) {
return collectSortedList(sortFunction).flatMapIterable(identityFunction());
}
代码示例来源:origin: io.projectreactor/reactor-core
/**
* Sort elements from this {@link Flux} by collecting and sorting them in the background
* then emitting the sorted sequence once this sequence completes.
* Each item emitted by the {@link Flux} must implement {@link Comparable} with
* respect to all other items in the sequence.
*
* <p>Note that calling {@code sort} with long, non-terminating or infinite sources
* might cause {@link OutOfMemoryError}. Use sequence splitting like {@link #window} to sort batches in that case.
*
* @throws ClassCastException if any item emitted by the {@link Flux} does not implement
* {@link Comparable} with respect to all other items emitted by the {@link Flux}
* @return a sorted {@link Flux}
*/
public final Flux<T> sort(){
return collectSortedList().flatMapIterable(identityFunction());
}
代码示例来源:origin: org.cloudfoundry/cloudfoundry-operations
private static Flux<LogMessage> getLogs(Mono<DopplerClient> dopplerClient, String applicationId, Boolean recent) {
if (Optional.ofNullable(recent).orElse(false)) {
return requestLogsRecent(dopplerClient, applicationId)
.filter(e -> EventType.LOG_MESSAGE == e.getEventType())
.map(Envelope::getLogMessage)
.collectSortedList(LOG_MESSAGE_COMPARATOR)
.flatMapIterable(d -> d);
} else {
return requestLogsStream(dopplerClient, applicationId)
.filter(e -> EventType.LOG_MESSAGE == e.getEventType())
.map(Envelope::getLogMessage)
.compose(SortingUtils.timespan(LOG_MESSAGE_COMPARATOR, LOG_MESSAGE_TIMESPAN));
}
}
内容来源于网络,如有侵权,请联系作者删除!