reactor.core.publisher.Flux.merge()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(7.0k)|赞(0)|评价(0)|浏览(477)

本文整理了Java中reactor.core.publisher.Flux.merge()方法的一些代码示例,展示了Flux.merge()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.merge()方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:merge

Flux.merge介绍

[英]Merge data from Publisher sequences contained in an array / vararg into an interleaved merged sequence. Unlike #concat(Publisher), sources are subscribed to eagerly.

Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to another source.
[中]将数组/vararg中包含的发布者序列中的数据合并到交错合并序列中。与#concat(Publisher)不同的是,资源被热切地订阅。
请注意,merge是为使用异步源或有限源而定制的。当处理尚未在专用调度程序上发布的无限源时,必须在其自己的调度程序中隔离该源,因为merge会在订阅另一个源之前尝试将其耗尽。

代码示例

代码示例来源:origin: lettuce-io/lettuce-core

@Override
public Mono<String> flushall() {
  Map<String, Publisher<String>> publishers = executeOnMasters(RedisServerReactiveCommands::flushall);
  return Flux.merge(publishers.values()).last();
}

代码示例来源:origin: lettuce-io/lettuce-core

@Override
public Mono<String> flushdb() {
  Map<String, Publisher<String>> publishers = executeOnMasters(RedisServerReactiveCommands::flushdb);
  return Flux.merge(publishers.values()).last();
}

代码示例来源:origin: lettuce-io/lettuce-core

@Override
public Flux<K> keys(K pattern) {
  Map<String, Publisher<K>> publishers = executeOnMasters(commands -> commands.keys(pattern));
  return Flux.merge(publishers.values());
}

代码示例来源:origin: lettuce-io/lettuce-core

@Override
public Mono<String> scriptFlush() {
  Map<String, Publisher<String>> publishers = executeOnNodes(RedisScriptingReactiveCommands::scriptFlush, ALL_NODES);
  return Flux.merge(publishers.values()).last();
}

代码示例来源:origin: lettuce-io/lettuce-core

@Override
public Mono<Long> dbsize() {
  Map<String, Publisher<Long>> publishers = executeOnMasters(RedisServerReactiveCommands::dbsize);
  return Flux.merge(publishers.values()).reduce((accu, next) -> accu + next);
}

代码示例来源:origin: lettuce-io/lettuce-core

@Override
public Mono<Long> del(Iterable<K> keys) {
  Map<Integer, List<K>> partitioned = SlotHash.partition(codec, keys);
  if (partitioned.size() < 2) {
    return super.del(keys);
  }
  List<Publisher<Long>> publishers = new ArrayList<>();
  for (Map.Entry<Integer, List<K>> entry : partitioned.entrySet()) {
    publishers.add(super.del(entry.getValue()));
  }
  return Flux.merge(publishers).reduce((accu, next) -> accu + next);
}

代码示例来源:origin: lettuce-io/lettuce-core

@Override
public Mono<Long> unlink(Iterable<K> keys) {
  Map<Integer, List<K>> partitioned = SlotHash.partition(codec, keys);
  if (partitioned.size() < 2) {
    return super.unlink(keys);
  }
  List<Publisher<Long>> publishers = new ArrayList<>();
  for (Map.Entry<Integer, List<K>> entry : partitioned.entrySet()) {
    publishers.add(super.unlink(entry.getValue()));
  }
  return Flux.merge(publishers).reduce((accu, next) -> accu + next);
}

代码示例来源:origin: lettuce-io/lettuce-core

public Mono<Long> touch(Iterable<K> keys) {
  List<K> keyList = LettuceLists.newList(keys);
  Map<Integer, List<K>> partitioned = SlotHash.partition(codec, keyList);
  if (partitioned.size() < 2) {
    return super.touch(keyList);
  }
  List<Publisher<Long>> publishers = new ArrayList<>();
  for (Map.Entry<Integer, List<K>> entry : partitioned.entrySet()) {
    publishers.add(super.touch(entry.getValue()));
  }
  return Flux.merge(publishers).reduce((accu, next) -> accu + next);
}

代码示例来源:origin: lettuce-io/lettuce-core

public Mono<Long> exists(Iterable<K> keys) {
  List<K> keyList = LettuceLists.newList(keys);
  Map<Integer, List<K>> partitioned = SlotHash.partition(codec, keyList);
  if (partitioned.size() < 2) {
    return super.exists(keyList);
  }
  List<Publisher<Long>> publishers = new ArrayList<>();
  for (Map.Entry<Integer, List<K>> entry : partitioned.entrySet()) {
    publishers.add(super.exists(entry.getValue()));
  }
  return Flux.merge(publishers).reduce((accu, next) -> accu + next);
}

代码示例来源:origin: lettuce-io/lettuce-core

@Override
public Mono<Long> keys(KeyStreamingChannel<K> channel, K pattern) {
  Map<String, Publisher<Long>> publishers = executeOnMasters(commands -> commands.keys(channel, pattern));
  return Flux.merge(publishers.values()).reduce((accu, next) -> accu + next);
}

代码示例来源:origin: lettuce-io/lettuce-core

@Override
public Mono<Void> shutdown(boolean save) {
  Map<String, Publisher<Void>> publishers = executeOnNodes(commands -> commands.shutdown(save), ALL_NODES);
  return Flux.merge(publishers.values()).then();
}

代码示例来源:origin: lettuce-io/lettuce-core

@Override
public Mono<String> scriptKill() {
  Map<String, Publisher<String>> publishers = executeOnNodes(RedisScriptingReactiveCommands::scriptFlush, ALL_NODES);
  return Flux.merge(publishers.values()).onErrorReturn("OK").last();
}

代码示例来源:origin: lettuce-io/lettuce-core

@Override
public Mono<String> scriptLoad(V script) {
  Map<String, Publisher<String>> publishers = executeOnNodes((commands) -> commands.scriptLoad(script), ALL_NODES);
  return Flux.merge(publishers.values()).last();
}

代码示例来源:origin: resilience4j/resilience4j

@RequestMapping(value = "stream/events", produces = MEDIA_TYPE_TEXT_EVENT_STREAM)
public SseEmitter getAllRateLimiterEventsStream() {
  Seq<Flux<RateLimiterEvent>> eventStreams = rateLimiterRegistry.getAllRateLimiters()
    .map(rateLimiter -> toFlux(rateLimiter.getEventPublisher()));
  return RateLimiterEventsEmitter.createSseEmitter(Flux.merge(eventStreams));
}

代码示例来源:origin: resilience4j/resilience4j

@RequestMapping(value = "stream/events", produces = MEDIA_TYPE_TEXT_EVENT_STREAM)
public SseEmitter getAllCircuitBreakerEventsStream() {
  Seq<Flux<CircuitBreakerEvent>> eventStreams = circuitBreakerRegistry.getAllCircuitBreakers()
      .map(circuitBreaker -> toFlux(circuitBreaker.getEventPublisher()));
  return CircuitBreakerEventEmitter.createSseEmitter(Flux.merge(eventStreams));
}

代码示例来源:origin: reactor/reactor-core

@Test
public void sampleMergeMonoTest() throws Exception {
  CountDownLatch latch = new CountDownLatch(2);
  Flux<Integer> p = Flux.merge(Flux.<Integer>empty().next(), Mono.just(1))
                 .log("mono");
  awaitLatch(p, latch);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void mergeEmpty(){
  StepVerifier.create(Flux.merge())
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void mergePublisherPublisherIterable(){
  StepVerifier.create(Flux.merge(Arrays.asList(Flux.just(1, 2), Flux.just(3, 4))))
        .expectNext(1, 2, 3, 4)
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void mergePublisherPublisher(){
  AtomicLong request = new AtomicLong();
  StepVerifier.create(Flux.merge(Flux.just(Flux.just(1, 2), Flux.just(3, 4)).doOnRequest(request::set)))
        .expectNext(1, 2, 3, 4)
        .then(() -> assertThat(request.get()).isEqualTo(1) )
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void normal() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Flux.merge(Flux.just(1),
      Flux.range(2, 2),
      Flux.just(4, 5, 6)
        .hide())
    .subscribe(ts);
  ts.assertValues(1, 2, 3, 4, 5, 6)
   .assertNoError()
   .assertComplete();
}

相关文章

微信公众号

最新文章

更多

Flux类方法