reactor.core.publisher.Flux.sort()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(5.3k)|赞(0)|评价(0)|浏览(331)

本文整理了Java中reactor.core.publisher.Flux.sort()方法的一些代码示例,展示了Flux.sort()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.sort()方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:sort

Flux.sort介绍

[英]Sort elements from this Flux by collecting and sorting them in the background then emitting the sorted sequence once this sequence completes. Each item emitted by the Flux must implement Comparable with respect to all other items in the sequence.

Note that calling sort with long, non-terminating or infinite sources might cause OutOfMemoryError. Use sequence splitting like #window to sort batches in that case.
[中]通过在后台收集和排序元素,然后在该序列完成后发出已排序的序列,从该流量中对元素进行排序。通量排放的每个项目必须与序列中的所有其他项目具有可比性。
请注意,使用长源、非终止源或无限源调用sort可能会导致OutOfMemoryError。在这种情况下,使用类似#窗口的序列拆分对批次进行排序。

代码示例

代码示例来源:origin: codecentric/spring-boot-admin

@Override
public Flux<InstanceEvent> findAll() {
  return Flux.defer(() -> Flux.fromIterable(eventLog.values())
                .flatMapIterable(Function.identity())
                .sort(byTimestampAndIdAndVersion));
}

代码示例来源:origin: spring-cloud/spring-cloud-gateway

public CachingRouteLocator(RouteLocator delegate) {
  this.delegate = delegate;
  routes = CacheFlux.lookup(cache, "routes", Route.class)
      .onCacheMissResume(() -> this.delegate.getRoutes().sort(AnnotationAwareOrderComparator.INSTANCE));
}

代码示例来源:origin: reactor/reactor-core

@Test(timeout = 1000)
public void gh841_streamFromIterable() {
  Flux<String> source = Flux.fromIterable(Arrays.asList("a","b"))
               .sort((a, b) -> { throw new IllegalStateException("boom"); });
  assertThatExceptionOfType(IllegalStateException.class)
      .isThrownBy(() -> source.toStream()
                  .collect(Collectors.toSet()))
      .withMessage("boom");
}

代码示例来源:origin: reactor/reactor-core

@Test(timeout = 1000)
public void gh841_streamCreate() {
  Flux<String> source = Flux.<String>create(sink -> {
    sink.next("a");
    sink.next("b");
    sink.complete();
  })
      .sort((a, b) -> { throw new IllegalStateException("boom"); });
  assertThatExceptionOfType(IllegalStateException.class)
      .isThrownBy(() -> source.toStream()
                  .collect(Collectors.toSet()))
      .withMessage("boom");
}

代码示例来源:origin: reactor/reactor-core

@Test(timeout = 1000)
public void gh841_iteratorFromCreate() {
  Iterator<String> it = Flux.<String>create(sink -> {
    sink.next("a");
    sink.next("b");
    sink.complete();
  }).sort((a, b) -> {
    throw new IllegalStateException("boom");
  }).toIterable().iterator();
  assertThatExceptionOfType(IllegalStateException.class)
      .isThrownBy(it::hasNext)
      .withMessage("boom");
}

代码示例来源:origin: reactor/reactor-core

@Test
public void aFluxCanBeSorted4(){
  StepVerifier.create(Flux.just(43, 32122, 422, 321, 43, 443311)
              .sort())
        .expectNext(43, 43, 321, 422, 32122, 443311)
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void aFluxCanBeSorted3(){
  StepVerifier.create(Flux.just(43, 32122, 422, 321, 43, 443311)
              .sort(Comparator.reverseOrder()))
        .expectNext(443311, 32122, 422, 321, 43, 43)
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void composeGroup() {
  Set<Integer> values = new ConcurrentSkipListSet<>();
  Flux<Integer> flux = Flux.range(1, 10)
               .parallel(3)
               .runOn(Schedulers.parallel())
               .doOnNext(values::add)
               .composeGroup(p -> p.log("rail" + p.key())
                         .map(i -> (p.key() + 1) * 100 + i))
               .sequential();
  StepVerifier.create(flux.sort())
        .assertNext(i -> assertThat(i - 100)
                      .isBetween(1, 10))
        .thenConsumeWhile(i -> i / 100 == 1)
        .assertNext(i -> assertThat(i - 200)
                      .isBetween(1, 10))
        .thenConsumeWhile(i -> i / 100 == 2)
        .assertNext(i -> assertThat(i - 300)
                      .isBetween(1, 10))
        .thenConsumeWhile(i -> i / 100 == 3)
        .verifyComplete();
  assertThat(values)
       .hasSize(10)
       .contains(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
}

代码示例来源:origin: com.aol.cyclops/cyclops-reactor

/**
 * @return
 * @see reactor.core.publisher.Flux#sort()
 */
public final Flux<T> sort() {
  return boxed.sort();
}
/**

代码示例来源:origin: com.aol.cyclops/cyclops-reactor

/**
 * @param sortFunction
 * @return
 * @see reactor.core.publisher.Flux#sort(java.util.Comparator)
 */
public final Flux<T> sort(Comparator<? super T> sortFunction) {
  return boxed.sort(sortFunction);
}
/**

代码示例来源:origin: com.aol.cyclops/cyclops-reactor

@Override
public ReactiveSeq<T> sorted() {
  return flux(flux.sort());
}

代码示例来源:origin: hantsy/angular-spring-reactive-sample

@GetMapping("")
public Flux<Post> all(@RequestParam(value = "q", required = false) String q,
  @RequestParam(value = "page", defaultValue = "0") long page,
  @RequestParam(value = "size", defaultValue = "10") long size) {
  return filterPublishedPostsByKeyword(q)
    .sort(comparing(Post::getCreatedDate).reversed())
    .skip(page * size).take(size);
}

代码示例来源:origin: akarnokd/akarnokd-misc

@Test
  public void testReactor() {
    for (int i = 1; i < 1000; i++) {
      final int j = i;
      int k = Flux.range(1, i).map(v -> j - v)
      .sort()
      .zipWith(Flux.range(1, Integer.MAX_VALUE), (a, b) -> Tuples.of(a, b))
      .collectList()
      .map(list -> list.size())
      .block();
      assertEquals(j, k);
    }
  }
}

代码示例来源:origin: learning-spring-boot/learning-spring-boot-2nd-edition-code

@Test
public void data6() {
  // tag::6[]
  Flux.just("alpha", "bravo", "charlie")
    .map(String::toUpperCase)
    .flatMap(s -> Flux.fromArray(s.split("")))
    .groupBy(String::toString)
    .sort((o1, o2) -> o1.key().compareTo(o2.key()))
    .flatMap(group -> Mono.just(group.key()).zipWith(group.count()))
    .map(keyAndCount -> keyAndCount.getT1() + " => " + keyAndCount.getT2())
    .subscribe(System.out::println);
  // end::6[]
}

相关文章

微信公众号

最新文章

更多

Flux类方法