本文整理了Java中reactor.core.publisher.Flux.compose()
方法的一些代码示例,展示了Flux.compose()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.compose()
方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:compose
[英]Defer the transformation of this Flux in order to generate a target Flux type. A transformation will occur for each Subscriber. For instance:
flux.compose(original -> original.log());
[中]推迟该通量的变换,以生成目标通量类型。每个订阅服务器都将发生转换。例如:
flux.compose(original -> original.log());
代码示例来源:origin: codecentric/spring-boot-admin
public void start() {
subscription = Flux.from(publisher)
.log(log.getName(), Level.FINEST)
.doOnSubscribe(s -> log.debug("Subscribed to {} events", eventType))
.ofType(eventType)
.cast(eventType)
.compose(this::handle)
.onErrorContinue((ex, value) -> log.warn("Unexpected error while handling {}", value, ex))
.subscribe();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void advancedTransform() {
AtomicInteger ai = new AtomicInteger();
Function<Flux<String>, Flux<String>> filterAndMap = f -> {
if (ai.incrementAndGet() == 1) {
return f.filter(color -> !color.equals("orange"))
.map(String::toUpperCase);
}
return f.filter(color -> !color.equals("purple"))
.map(String::toUpperCase);
};
Flux<String> composedFlux =
Flux.fromIterable(Arrays.asList("blue", "green", "orange", "purple"))
.doOnNext(System.out::println)
.compose(filterAndMap);
composedFlux.subscribe(d -> System.out.println("Subscriber 1 to Composed MapAndFilter :"+d));
composedFlux.subscribe(d -> System.out.println("Subscriber 2 to Composed MapAndFilter: "+d));
}
代码示例来源:origin: reactor/reactor-core
@Test
public void perSubscriberState() {
Flux<Integer> source = Flux.just(10).compose(f -> {
AtomicInteger value = new AtomicInteger();
return f.map(v -> v + value.incrementAndGet());
});
for (int i = 0; i < 10; i++) {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
source.subscribe(ts);
ts.assertValues(11)
.assertComplete()
.assertNoError();
}
}
代码示例来源:origin: reactor/reactor-core
@Test
public void composerThrows() {
Flux<Integer> source = Flux.just(10).compose(f -> {
throw new RuntimeException("Forced failure");
});
for (int i = 0; i < 10; i++) {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
source.subscribe(ts);
ts.assertNoValues()
.assertNotComplete()
.assertError(RuntimeException.class);
}
}
代码示例来源:origin: reactor/reactor-core
@Test
public void composerReturnsNull() {
Flux<Integer> source = Flux.just(10).compose(f -> {
return null;
});
for (int i = 0; i < 10; i++) {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
source.subscribe(ts);
ts.assertNoValues()
.assertNotComplete()
.assertError(NullPointerException.class);
}
}
}
代码示例来源:origin: com.aol.cyclops/cyclops-reactor
/**
* @param transformer
* @return
* @see reactor.core.publisher.Flux#compose(java.util.function.Function)
*/
public final <V> Flux<V> compose(Function<? super Flux<T>, ? extends Publisher<V>> transformer) {
return boxed.compose(transformer);
}
/**
代码示例来源:origin: spring-cloud/spring-cloud-function
if (supplier instanceof FluxSupplier) {
FluxConsumer<Object> fConsumer = ((FluxConsumer<Object>)b);
return (Supplier<Mono<Void>>) () -> Mono.from(supplier.get().compose(v -> fConsumer.apply(supplier.get())));
代码示例来源:origin: org.springframework.cloud/spring-cloud-function-context
if (supplier instanceof FluxSupplier) {
FluxConsumer<Object> fConsumer = ((FluxConsumer<Object>)b);
return (Supplier<Mono<Void>>) () -> Mono.from(supplier.get().compose(v -> fConsumer.apply(supplier.get())));
代码示例来源:origin: org.cloudfoundry/cloudfoundry-operations
private static Flux<LogMessage> getLogs(Mono<DopplerClient> dopplerClient, String applicationId, Boolean recent) {
if (Optional.ofNullable(recent).orElse(false)) {
return requestLogsRecent(dopplerClient, applicationId)
.filter(e -> EventType.LOG_MESSAGE == e.getEventType())
.map(Envelope::getLogMessage)
.collectSortedList(LOG_MESSAGE_COMPARATOR)
.flatMapIterable(d -> d);
} else {
return requestLogsStream(dopplerClient, applicationId)
.filter(e -> EventType.LOG_MESSAGE == e.getEventType())
.map(Envelope::getLogMessage)
.compose(SortingUtils.timespan(LOG_MESSAGE_COMPARATOR, LOG_MESSAGE_TIMESPAN));
}
}
代码示例来源:origin: akarnokd/akarnokd-misc
@Test
public void errorDedicated() {
Flux.error(new IOException())
.compose(composeIfNonEmptyDedicated(f -> {
System.out.println("Composed!");
return f.doOnNext(System.out::println).then();
}))
.subscribe(v -> { }, Throwable::printStackTrace, () -> System.out.println("Done"));
}
代码示例来源:origin: akarnokd/akarnokd-misc
@Test
public void error() {
Flux.error(new IOException())
.compose(composeIfNonEmpty(f -> {
System.out.println("Composed!");
return f.doOnNext(System.out::println).then();
}))
.subscribe(v -> { }, Throwable::printStackTrace, () -> System.out.println("Done"));
}
代码示例来源:origin: akarnokd/akarnokd-misc
public static void main(String[] args) {
Flux.just(1, 2, 3, 6, 7, 10)
.flatMap(v -> Mono.delay(Duration.ofMillis(v * 1000)).doOnNext(w -> System.out.println("T=" + v)).map(w -> v))
.compose(f -> delayedBufferAfterFirst(f, Duration.ofSeconds(2)))
.doOnNext(System.out::println)
.blockLast();
}
代码示例来源:origin: akarnokd/akarnokd-misc
@Test
public void simpleDedicated() throws Exception {
Flux.range(1, 10)
.hide()
.compose(composeIfNonEmptyDedicated(f -> {
System.out.println("Composed!");
return f.doOnNext(System.out::println).then();
}))
.subscribe(v -> { }, Throwable::printStackTrace, () -> System.out.println("Done"));
}
代码示例来源:origin: akarnokd/akarnokd-misc
@Test
public void simple() throws Exception {
Flux.range(1, 10)
.hide()
.compose(composeIfNonEmpty(f -> {
System.out.println("Composed!");
return f.doOnNext(System.out::println).then();
}))
.subscribe(v -> { }, Throwable::printStackTrace, () -> System.out.println("Done"));
}
内容来源于网络,如有侵权,请联系作者删除!