本文整理了Java中reactor.core.publisher.Flux.merge()
方法的一些代码示例,展示了Flux.merge()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.merge()
方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:merge
[英]Merge data from Publisher sequences contained in an array / vararg into an interleaved merged sequence. Unlike #concat(Publisher), sources are subscribed to eagerly.
Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to another source.
[中]将数组/vararg中包含的发布者序列中的数据合并到交错合并序列中。与#concat(Publisher)不同的是,资源被热切地订阅。
请注意,merge是为使用异步源或有限源而定制的。当处理尚未在专用调度程序上发布的无限源时,必须在其自己的调度程序中隔离该源,因为merge会在订阅另一个源之前尝试将其耗尽。
代码示例来源:origin: lettuce-io/lettuce-core
@Override
public Mono<String> flushall() {
Map<String, Publisher<String>> publishers = executeOnMasters(RedisServerReactiveCommands::flushall);
return Flux.merge(publishers.values()).last();
}
代码示例来源:origin: lettuce-io/lettuce-core
@Override
public Mono<String> flushdb() {
Map<String, Publisher<String>> publishers = executeOnMasters(RedisServerReactiveCommands::flushdb);
return Flux.merge(publishers.values()).last();
}
代码示例来源:origin: lettuce-io/lettuce-core
@Override
public Flux<K> keys(K pattern) {
Map<String, Publisher<K>> publishers = executeOnMasters(commands -> commands.keys(pattern));
return Flux.merge(publishers.values());
}
代码示例来源:origin: lettuce-io/lettuce-core
@Override
public Mono<String> scriptFlush() {
Map<String, Publisher<String>> publishers = executeOnNodes(RedisScriptingReactiveCommands::scriptFlush, ALL_NODES);
return Flux.merge(publishers.values()).last();
}
代码示例来源:origin: lettuce-io/lettuce-core
@Override
public Mono<Long> dbsize() {
Map<String, Publisher<Long>> publishers = executeOnMasters(RedisServerReactiveCommands::dbsize);
return Flux.merge(publishers.values()).reduce((accu, next) -> accu + next);
}
代码示例来源:origin: lettuce-io/lettuce-core
@Override
public Mono<Long> del(Iterable<K> keys) {
Map<Integer, List<K>> partitioned = SlotHash.partition(codec, keys);
if (partitioned.size() < 2) {
return super.del(keys);
}
List<Publisher<Long>> publishers = new ArrayList<>();
for (Map.Entry<Integer, List<K>> entry : partitioned.entrySet()) {
publishers.add(super.del(entry.getValue()));
}
return Flux.merge(publishers).reduce((accu, next) -> accu + next);
}
代码示例来源:origin: lettuce-io/lettuce-core
@Override
public Mono<Long> unlink(Iterable<K> keys) {
Map<Integer, List<K>> partitioned = SlotHash.partition(codec, keys);
if (partitioned.size() < 2) {
return super.unlink(keys);
}
List<Publisher<Long>> publishers = new ArrayList<>();
for (Map.Entry<Integer, List<K>> entry : partitioned.entrySet()) {
publishers.add(super.unlink(entry.getValue()));
}
return Flux.merge(publishers).reduce((accu, next) -> accu + next);
}
代码示例来源:origin: lettuce-io/lettuce-core
public Mono<Long> touch(Iterable<K> keys) {
List<K> keyList = LettuceLists.newList(keys);
Map<Integer, List<K>> partitioned = SlotHash.partition(codec, keyList);
if (partitioned.size() < 2) {
return super.touch(keyList);
}
List<Publisher<Long>> publishers = new ArrayList<>();
for (Map.Entry<Integer, List<K>> entry : partitioned.entrySet()) {
publishers.add(super.touch(entry.getValue()));
}
return Flux.merge(publishers).reduce((accu, next) -> accu + next);
}
代码示例来源:origin: lettuce-io/lettuce-core
public Mono<Long> exists(Iterable<K> keys) {
List<K> keyList = LettuceLists.newList(keys);
Map<Integer, List<K>> partitioned = SlotHash.partition(codec, keyList);
if (partitioned.size() < 2) {
return super.exists(keyList);
}
List<Publisher<Long>> publishers = new ArrayList<>();
for (Map.Entry<Integer, List<K>> entry : partitioned.entrySet()) {
publishers.add(super.exists(entry.getValue()));
}
return Flux.merge(publishers).reduce((accu, next) -> accu + next);
}
代码示例来源:origin: lettuce-io/lettuce-core
@Override
public Mono<Long> keys(KeyStreamingChannel<K> channel, K pattern) {
Map<String, Publisher<Long>> publishers = executeOnMasters(commands -> commands.keys(channel, pattern));
return Flux.merge(publishers.values()).reduce((accu, next) -> accu + next);
}
代码示例来源:origin: lettuce-io/lettuce-core
@Override
public Mono<Void> shutdown(boolean save) {
Map<String, Publisher<Void>> publishers = executeOnNodes(commands -> commands.shutdown(save), ALL_NODES);
return Flux.merge(publishers.values()).then();
}
代码示例来源:origin: lettuce-io/lettuce-core
@Override
public Mono<String> scriptKill() {
Map<String, Publisher<String>> publishers = executeOnNodes(RedisScriptingReactiveCommands::scriptFlush, ALL_NODES);
return Flux.merge(publishers.values()).onErrorReturn("OK").last();
}
代码示例来源:origin: lettuce-io/lettuce-core
@Override
public Mono<String> scriptLoad(V script) {
Map<String, Publisher<String>> publishers = executeOnNodes((commands) -> commands.scriptLoad(script), ALL_NODES);
return Flux.merge(publishers.values()).last();
}
代码示例来源:origin: resilience4j/resilience4j
@RequestMapping(value = "stream/events", produces = MEDIA_TYPE_TEXT_EVENT_STREAM)
public SseEmitter getAllRateLimiterEventsStream() {
Seq<Flux<RateLimiterEvent>> eventStreams = rateLimiterRegistry.getAllRateLimiters()
.map(rateLimiter -> toFlux(rateLimiter.getEventPublisher()));
return RateLimiterEventsEmitter.createSseEmitter(Flux.merge(eventStreams));
}
代码示例来源:origin: resilience4j/resilience4j
@RequestMapping(value = "stream/events", produces = MEDIA_TYPE_TEXT_EVENT_STREAM)
public SseEmitter getAllCircuitBreakerEventsStream() {
Seq<Flux<CircuitBreakerEvent>> eventStreams = circuitBreakerRegistry.getAllCircuitBreakers()
.map(circuitBreaker -> toFlux(circuitBreaker.getEventPublisher()));
return CircuitBreakerEventEmitter.createSseEmitter(Flux.merge(eventStreams));
}
代码示例来源:origin: reactor/reactor-core
@Test
public void sampleMergeMonoTest() throws Exception {
CountDownLatch latch = new CountDownLatch(2);
Flux<Integer> p = Flux.merge(Flux.<Integer>empty().next(), Mono.just(1))
.log("mono");
awaitLatch(p, latch);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void mergeEmpty(){
StepVerifier.create(Flux.merge())
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void mergePublisherPublisherIterable(){
StepVerifier.create(Flux.merge(Arrays.asList(Flux.just(1, 2), Flux.just(3, 4))))
.expectNext(1, 2, 3, 4)
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void mergePublisherPublisher(){
AtomicLong request = new AtomicLong();
StepVerifier.create(Flux.merge(Flux.just(Flux.just(1, 2), Flux.just(3, 4)).doOnRequest(request::set)))
.expectNext(1, 2, 3, 4)
.then(() -> assertThat(request.get()).isEqualTo(1) )
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void normal() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Flux.merge(Flux.just(1),
Flux.range(2, 2),
Flux.just(4, 5, 6)
.hide())
.subscribe(ts);
ts.assertValues(1, 2, 3, 4, 5, 6)
.assertNoError()
.assertComplete();
}
内容来源于网络,如有侵权,请联系作者删除!