本文整理了Java中reactor.core.publisher.Flux.cache()
方法的一些代码示例,展示了Flux.cache()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.cache()
方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:cache
[英]Turn this Flux into a hot source and cache last emitted signals for further Subscriber. Will retain an unbounded volume of onNext signals. Completion and Error will also be replayed.
[中]将此流量转化为热源,并缓存最后发出的信号,以供进一步使用。将保留无限量的onNext信号。完成和错误也将被重播。
代码示例来源:origin: reactor/reactor-core
/**
* Turn this {@link Flux} into a hot source and cache last emitted signals for further {@link Subscriber}. Will
* retain an unbounded volume of onNext signals. Completion and Error will also be
* replayed.
* <p>
* <img class="marble" src="doc-files/marbles/cacheForFlux.svg" alt="">
*
* @return a replaying {@link Flux}
*/
public final Flux<T> cache() {
return cache(Integer.MAX_VALUE);
}
代码示例来源:origin: reactor/reactor-core
/**
* Turn this {@link Flux} into a hot source and cache last emitted signals for further
* {@link Subscriber}. Will retain an unbounded history but apply a per-item expiry timeout
* <p>
* Completion and Error will also be replayed until {@code ttl} triggers in which case
* the next {@link Subscriber} will start over a new subscription.
* <p>
* <img class="marble" src="doc-files/marbles/cacheWithTtlForFlux.svg" alt="">
*
* @param ttl Time-to-live for each cached item and post termination.
* @param timer the {@link Scheduler} on which to measure the duration.
*
* @return a replaying {@link Flux}
*/
public final Flux<T> cache(Duration ttl, Scheduler timer) {
return cache(Integer.MAX_VALUE, ttl, timer);
}
代码示例来源:origin: spring-projects/spring-framework
public MockClientHttpRequest(HttpMethod httpMethod, URI url) {
this.httpMethod = httpMethod;
this.url = url;
this.writeHandler = body -> {
this.body = body.cache();
return this.body.then();
};
}
代码示例来源:origin: spring-projects/spring-framework
public MockServerHttpResponse() {
super(new DefaultDataBufferFactory());
this.writeHandler = body -> {
this.body = body.cache();
return this.body.then();
};
}
代码示例来源:origin: spring-cloud/spring-cloud-gateway
private Function<Flux<DataBuffer>, Mono<Void>> initDefaultWriteHandler() {
return body -> {
this.body = body.cache();
return this.body.then();
};
}
代码示例来源:origin: reactor/reactor-core
/**
* Turn this {@link Flux} into a hot source and cache last emitted signals for further
* {@link Subscriber}. Will retain an unbounded history but apply a per-item expiry timeout
* <p>
* Completion and Error will also be replayed until {@code ttl} triggers in which case
* the next {@link Subscriber} will start over a new subscription.
* <p>
* <img class="marble" src="doc-files/marbles/cacheWithTtlForFlux.svg" alt="">
*
* @param ttl Time-to-live for each cached item and post termination.
*
* @return a replaying {@link Flux}
*/
public final Flux<T> cache(Duration ttl) {
return cache(ttl, Schedulers.parallel());
}
代码示例来源:origin: reactor/reactor-core
/**
* Turn this {@link Flux} into a hot source and cache last emitted signals for further
* {@link Subscriber}. Will retain up to the given history size and apply a per-item expiry
* timeout.
* <p>
* Completion and Error will also be replayed until {@code ttl} triggers in which case
* the next {@link Subscriber} will start over a new subscription.
* <p>
* <img class="marble" src="doc-files/marbles/cacheWithTtlAndMaxLimitForFlux.svg" alt="">
*
* @param history number of elements retained in cache
* @param ttl Time-to-live for each cached item and post termination.
*
* @return a replaying {@link Flux}
*/
public final Flux<T> cache(int history, Duration ttl) {
return cache(history, ttl, Schedulers.parallel());
}
代码示例来源:origin: spring-projects/spring-framework
public MockServerHttpResponse(DataBufferFactory dataBufferFactory) {
super(dataBufferFactory);
this.writeHandler = body -> {
this.body = body.cache();
return this.body.then();
};
}
代码示例来源:origin: spring-projects/spring-framework
public MockClientHttpRequest(HttpMethod httpMethod, URI url) {
this.httpMethod = httpMethod;
this.url = url;
this.writeHandler = body -> {
this.body = body.cache();
return this.body.then();
};
}
代码示例来源:origin: reactor/reactor-core
@Test
public void testParallelism() throws Exception
{
Flux<Integer> flux = Flux.just(1, 2, 3);
Set<String> threadNames = Collections.synchronizedSet(new TreeSet<>());
AtomicInteger count = new AtomicInteger();
CountDownLatch latch = new CountDownLatch(3);
flux
// Uncomment line below for failure
.cache(1)
.parallel(3)
.runOn(Schedulers.newElastic("TEST"))
.subscribe(i ->
{
threadNames.add(Thread.currentThread()
.getName());
count.incrementAndGet();
latch.countDown();
tryToSleep(1000);
});
latch.await();
Assert.assertEquals("Multithreaded count", 3, count.get());
Assert.assertEquals("Multithreaded threads", 3, threadNames.size());
}
代码示例来源:origin: reactor/reactor-core
@Test
public void sampleCombineLatestTest() throws Exception {
int elements = 40;
CountDownLatch latch = new CountDownLatch(elements / 2 - 2);
Flux.combineLatest(
sensorOdd().cache().delayElements(Duration.ofMillis(100)),
sensorEven().cache().delayElements(Duration.ofMillis(200)),
this::computeMin)
.log("combineLatest")
.subscribe(i -> latch.countDown(), null, latch::countDown);
generateData(elements);
awaitLatch(null, latch);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void issue422(){
Flux<Integer> source = Flux.create((sink) -> {
for (int i = 0; i < 300; i++) {
sink.next(i);
}
sink.complete();
});
Flux<Integer> cached = source.cache();
long cachedCount = cached.concatMapIterable(Collections::singleton)
.distinct().count().block();
//System.out.println("source: " + sourceCount);
System.out.println("cached: " + cachedCount);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void cacheContextHistory() {
AtomicInteger contextFillCount = new AtomicInteger();
Flux<String> cached = Flux.just(1, 2)
.flatMap(i -> Mono.subscriberContext()
.map(ctx -> ctx.getOrDefault("a", "BAD"))
)
.cache(1)
.subscriberContext(ctx -> ctx.put("a", "GOOD" + contextFillCount.incrementAndGet()));
//at first pass, the context is captured
String cacheMiss = cached.blockLast();
assertThat(cacheMiss).as("cacheMiss").isEqualTo("GOOD1");
assertThat(contextFillCount).as("cacheMiss").hasValue(1);
//at second subscribe, the Context fill attempt is still done, but ultimately ignored since first context is cached
String cacheHit = cached.blockLast();
assertThat(cacheHit).as("cacheHit").isEqualTo("GOOD1"); //value from the cache
assertThat(contextFillCount).as("cacheHit").hasValue(2); //function was still invoked
//at third subscribe, function is called for the 3rd time, but the context is still cached
String cacheHit2 = cached.blockLast();
assertThat(cacheHit2).as("cacheHit2").isEqualTo("GOOD1");
assertThat(contextFillCount).as("cacheHit2").hasValue(3);
//at fourth subscribe, function is called for the 4th time, but the context is still cached
String cacheHit3 = cached.blockLast();
assertThat(cacheHit3).as("cacheHit3").isEqualTo("GOOD1");
assertThat(contextFillCount).as("cacheHit3").hasValue(4);
}
代码示例来源:origin: reactor/reactor-core
@Test(timeout = 5_000)
public void testBufferSize1Created() throws Exception {
TopicProcessor<String> broadcast = TopicProcessor.<String>builder().name("share-name").bufferSize(1).autoCancel(true).build();
int simultaneousSubscribers = 3000;
CountDownLatch latch = new CountDownLatch(simultaneousSubscribers);
Scheduler scheduler = Schedulers.single();
FluxSink<String> sink = broadcast.sink();
Flux<String> flux = broadcast.filter(Objects::nonNull)
.publishOn(scheduler)
.cache(1);
for (int i = 0; i < simultaneousSubscribers; i++) {
flux.subscribe(s -> latch.countDown());
}
sink.next("data");
assertThat(latch.await(4, TimeUnit.SECONDS))
.overridingErrorMessage("Data not received")
.isTrue();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void cacheFluxTTL2() {
VirtualTimeScheduler vts = VirtualTimeScheduler.create();
AtomicInteger i = new AtomicInteger(0);
Flux<Integer> source = Flux.defer(() -> Flux.just(i.incrementAndGet()))
.cache(Duration.ofMillis(2000), vts);
StepVerifier.create(source)
.expectNext(1)
.verifyComplete();
StepVerifier.create(source)
.expectNext(1)
.verifyComplete();
vts.advanceTimeBy(Duration.ofSeconds(3));
StepVerifier.create(source)
.expectNext(2)
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test(timeout = 5_000)
public void testBufferSize1Created() throws Exception {
WorkQueueProcessor<String> broadcast = WorkQueueProcessor.<String>builder()
.share(true).name("share-name")
.bufferSize(1)
.autoCancel(true)
.build();
int simultaneousSubscribers = 3000;
CountDownLatch latch = new CountDownLatch(simultaneousSubscribers);
Scheduler scheduler = Schedulers.single();
FluxSink<String> sink = broadcast.sink();
Flux<String> flux = broadcast.filter(Objects::nonNull)
.publishOn(scheduler)
.cache(1);
for (int i = 0; i < simultaneousSubscribers; i++) {
flux.subscribe(s -> latch.countDown());
}
sink.next("data");
Assertions.assertThat(latch.await(4, TimeUnit.SECONDS))
.overridingErrorMessage("Data not received")
.isTrue();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void allSameFusable() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
ts.requestedFusionMode(Fuseable.ANY);
Flux.just(1, 1, 1, 1, 1, 1, 1, 1, 1)
.distinct(k -> k)
.filter(t -> true)
.map(t -> t)
.cache(4)
.subscribe(ts);
ts.assertValues(1)
.assertFuseableSource()
.assertFusionEnabled()
.assertFusionMode(Fuseable.ASYNC)
.assertComplete()
.assertNoError();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void cacheFluxTTL() {
VirtualTimeScheduler vts = VirtualTimeScheduler.create();
Flux<Tuple2<Long, Integer>> source = Flux.just(1, 2, 3)
.delayElements(Duration.ofMillis(1000)
, vts)
.cache(Duration.ofMillis(2000), vts)
.elapsed(vts);
StepVerifier.withVirtualTime(() -> source, () -> vts, Long.MAX_VALUE)
.thenAwait(Duration.ofSeconds(3))
.expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 1)
.expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 2)
.expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 3)
.verifyComplete();
StepVerifier.withVirtualTime(() -> source, () -> vts, Long.MAX_VALUE)
.thenAwait(Duration.ofSeconds(3))
.expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 2)
.expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 3)
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void cacheFluxHistoryTTL() {
VirtualTimeScheduler vts = VirtualTimeScheduler.create();
Flux<Tuple2<Long, Integer>> source = Flux.just(1, 2, 3)
.delayElements(Duration.ofMillis
(1000), vts)
.cache(2, Duration.ofMillis(2000), vts)
.elapsed(vts);
StepVerifier.withVirtualTime(() -> source, () -> vts, Long.MAX_VALUE)
.thenAwait(Duration.ofSeconds(3))
.expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 1)
.expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 2)
.expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 3)
.verifyComplete();
StepVerifier.withVirtualTime(() -> source, () -> vts, Long.MAX_VALUE)
.thenAwait(Duration.ofSeconds(3))
.expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 2)
.expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 3)
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void cacheFlux() {
VirtualTimeScheduler vts = VirtualTimeScheduler.create();
Flux<Tuple2<Long, Integer>> source = Flux.just(1, 2, 3)
.delayElements(Duration.ofMillis(1000)
, vts)
.cache()
.elapsed(vts);
StepVerifier.withVirtualTime(() -> source, () -> vts, Long.MAX_VALUE)
.thenAwait(Duration.ofSeconds(3))
.expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 1)
.expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 2)
.expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 3)
.verifyComplete();
StepVerifier.withVirtualTime(() -> source, () -> vts, Long.MAX_VALUE)
.thenAwait(Duration.ofSeconds(3))
.expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 1)
.expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 2)
.expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 3)
.verifyComplete();
}
内容来源于网络,如有侵权,请联系作者删除!