reactor.core.publisher.Flux.cache()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(11.7k)|赞(0)|评价(0)|浏览(452)

本文整理了Java中reactor.core.publisher.Flux.cache()方法的一些代码示例,展示了Flux.cache()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.cache()方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:cache

Flux.cache介绍

[英]Turn this Flux into a hot source and cache last emitted signals for further Subscriber. Will retain an unbounded volume of onNext signals. Completion and Error will also be replayed.
[中]将此流量转化为热源,并缓存最后发出的信号,以供进一步使用。将保留无限量的onNext信号。完成和错误也将被重播。

代码示例

代码示例来源:origin: reactor/reactor-core

/**
 * Turn this {@link Flux} into a hot source and cache last emitted signals for further {@link Subscriber}. Will
 * retain an unbounded volume of onNext signals. Completion and Error will also be
 * replayed.
 * <p>
 * <img class="marble" src="doc-files/marbles/cacheForFlux.svg" alt="">
 *
 * @return a replaying {@link Flux}
 */
public final Flux<T> cache() {
  return cache(Integer.MAX_VALUE);
}

代码示例来源:origin: reactor/reactor-core

/**
 * Turn this {@link Flux} into a hot source and cache last emitted signals for further
 * {@link Subscriber}. Will retain an unbounded history but apply a per-item expiry timeout
 * <p>
 *   Completion and Error will also be replayed until {@code ttl} triggers in which case
 *   the next {@link Subscriber} will start over a new subscription.
 * <p>
 * <img class="marble" src="doc-files/marbles/cacheWithTtlForFlux.svg" alt="">
 *
 * @param ttl Time-to-live for each cached item and post termination.
 * @param timer the {@link Scheduler} on which to measure the duration.
 *
 * @return a replaying {@link Flux}
 */
public final Flux<T> cache(Duration ttl, Scheduler timer) {
  return cache(Integer.MAX_VALUE, ttl, timer);
}

代码示例来源:origin: spring-projects/spring-framework

public MockClientHttpRequest(HttpMethod httpMethod, URI url) {
  this.httpMethod = httpMethod;
  this.url = url;
  this.writeHandler = body -> {
    this.body = body.cache();
    return this.body.then();
  };
}

代码示例来源:origin: spring-projects/spring-framework

public MockServerHttpResponse() {
  super(new DefaultDataBufferFactory());
  this.writeHandler = body -> {
    this.body = body.cache();
    return this.body.then();
  };
}

代码示例来源:origin: spring-cloud/spring-cloud-gateway

private Function<Flux<DataBuffer>, Mono<Void>> initDefaultWriteHandler() {
  return body -> {
    this.body = body.cache();
    return this.body.then();
  };
}

代码示例来源:origin: reactor/reactor-core

/**
 * Turn this {@link Flux} into a hot source and cache last emitted signals for further
 * {@link Subscriber}. Will retain an unbounded history but apply a per-item expiry timeout
 * <p>
 *   Completion and Error will also be replayed until {@code ttl} triggers in which case
 *   the next {@link Subscriber} will start over a new subscription.
 * <p>
 * <img class="marble" src="doc-files/marbles/cacheWithTtlForFlux.svg" alt="">
 *
 * @param ttl Time-to-live for each cached item and post termination.
 *
 * @return a replaying {@link Flux}
 */
public final Flux<T> cache(Duration ttl) {
  return cache(ttl, Schedulers.parallel());
}

代码示例来源:origin: reactor/reactor-core

/**
 * Turn this {@link Flux} into a hot source and cache last emitted signals for further
 * {@link Subscriber}. Will retain up to the given history size and apply a per-item expiry
 * timeout.
 * <p>
 *   Completion and Error will also be replayed until {@code ttl} triggers in which case
 *   the next {@link Subscriber} will start over a new subscription.
 * <p>
 * <img class="marble" src="doc-files/marbles/cacheWithTtlAndMaxLimitForFlux.svg" alt="">
 *
 * @param history number of elements retained in cache
 * @param ttl Time-to-live for each cached item and post termination.
 *
 * @return a replaying {@link Flux}
 */
public final Flux<T> cache(int history, Duration ttl) {
  return cache(history, ttl, Schedulers.parallel());
}

代码示例来源:origin: spring-projects/spring-framework

public MockServerHttpResponse(DataBufferFactory dataBufferFactory) {
  super(dataBufferFactory);
  this.writeHandler = body -> {
    this.body = body.cache();
    return this.body.then();
  };
}

代码示例来源:origin: spring-projects/spring-framework

public MockClientHttpRequest(HttpMethod httpMethod, URI url) {
  this.httpMethod = httpMethod;
  this.url = url;
  this.writeHandler = body -> {
    this.body = body.cache();
    return this.body.then();
  };
}

代码示例来源:origin: reactor/reactor-core

@Test
public void testParallelism() throws Exception
{
  Flux<Integer> flux = Flux.just(1, 2, 3);
  Set<String> threadNames = Collections.synchronizedSet(new TreeSet<>());
  AtomicInteger count = new AtomicInteger();
  CountDownLatch latch = new CountDownLatch(3);
  flux
      // Uncomment line below for failure
      .cache(1)
      .parallel(3)
      .runOn(Schedulers.newElastic("TEST"))
      .subscribe(i ->
      {
        threadNames.add(Thread.currentThread()
                   .getName());
        count.incrementAndGet();
        latch.countDown();
        tryToSleep(1000);
      });
  latch.await();
  Assert.assertEquals("Multithreaded count", 3, count.get());
  Assert.assertEquals("Multithreaded threads", 3, threadNames.size());
}

代码示例来源:origin: reactor/reactor-core

@Test
public void sampleCombineLatestTest() throws Exception {
  int elements = 40;
  CountDownLatch latch = new CountDownLatch(elements / 2 - 2);
  Flux.combineLatest(
      sensorOdd().cache().delayElements(Duration.ofMillis(100)),
      sensorEven().cache().delayElements(Duration.ofMillis(200)),
      this::computeMin)
              .log("combineLatest")
              .subscribe(i -> latch.countDown(), null, latch::countDown);
  generateData(elements);
  awaitLatch(null, latch);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void issue422(){
  Flux<Integer> source = Flux.create((sink) -> {
    for (int i = 0; i < 300; i++) {
      sink.next(i);
    }
    sink.complete();
  });
  Flux<Integer> cached = source.cache();
  long cachedCount = cached.concatMapIterable(Collections::singleton)
               .distinct().count().block();
  //System.out.println("source: " + sourceCount);
  System.out.println("cached: " + cachedCount);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void cacheContextHistory() {
  AtomicInteger contextFillCount = new AtomicInteger();
  Flux<String> cached = Flux.just(1, 2)
               .flatMap(i -> Mono.subscriberContext()
                        .map(ctx -> ctx.getOrDefault("a", "BAD"))
               )
               .cache(1)
               .subscriberContext(ctx -> ctx.put("a", "GOOD" + contextFillCount.incrementAndGet()));
  //at first pass, the context is captured
  String cacheMiss = cached.blockLast();
  assertThat(cacheMiss).as("cacheMiss").isEqualTo("GOOD1");
  assertThat(contextFillCount).as("cacheMiss").hasValue(1);
  //at second subscribe, the Context fill attempt is still done, but ultimately ignored since first context is cached
  String cacheHit = cached.blockLast();
  assertThat(cacheHit).as("cacheHit").isEqualTo("GOOD1"); //value from the cache
  assertThat(contextFillCount).as("cacheHit").hasValue(2); //function was still invoked
  //at third subscribe, function is called for the 3rd time, but the context is still cached
  String cacheHit2 = cached.blockLast();
  assertThat(cacheHit2).as("cacheHit2").isEqualTo("GOOD1");
  assertThat(contextFillCount).as("cacheHit2").hasValue(3);
  //at fourth subscribe, function is called for the 4th time, but the context is still cached
  String cacheHit3 = cached.blockLast();
  assertThat(cacheHit3).as("cacheHit3").isEqualTo("GOOD1");
  assertThat(contextFillCount).as("cacheHit3").hasValue(4);
}

代码示例来源:origin: reactor/reactor-core

@Test(timeout = 5_000)
public void testBufferSize1Created() throws Exception {
  TopicProcessor<String> broadcast = TopicProcessor.<String>builder().name("share-name").bufferSize(1).autoCancel(true).build();
  int simultaneousSubscribers = 3000;
  CountDownLatch latch = new CountDownLatch(simultaneousSubscribers);
  Scheduler scheduler = Schedulers.single();
  FluxSink<String> sink = broadcast.sink();
  Flux<String> flux = broadcast.filter(Objects::nonNull)
                 .publishOn(scheduler)
                 .cache(1);
  for (int i = 0; i < simultaneousSubscribers; i++) {
    flux.subscribe(s -> latch.countDown());
  }
  sink.next("data");
  assertThat(latch.await(4, TimeUnit.SECONDS))
      .overridingErrorMessage("Data not received")
      .isTrue();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void cacheFluxTTL2() {
  VirtualTimeScheduler vts = VirtualTimeScheduler.create();
  AtomicInteger i = new AtomicInteger(0);
  Flux<Integer> source = Flux.defer(() -> Flux.just(i.incrementAndGet()))
                .cache(Duration.ofMillis(2000), vts);
  StepVerifier.create(source)
        .expectNext(1)
        .verifyComplete();
  StepVerifier.create(source)
        .expectNext(1)
        .verifyComplete();
  vts.advanceTimeBy(Duration.ofSeconds(3));
  StepVerifier.create(source)
        .expectNext(2)
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test(timeout = 5_000)
public void testBufferSize1Created() throws Exception {
  WorkQueueProcessor<String> broadcast = WorkQueueProcessor.<String>builder()
      .share(true).name("share-name")
      .bufferSize(1)
      .autoCancel(true)
      .build();
  int simultaneousSubscribers = 3000;
  CountDownLatch latch = new CountDownLatch(simultaneousSubscribers);
  Scheduler scheduler = Schedulers.single();
  FluxSink<String> sink = broadcast.sink();
  Flux<String> flux = broadcast.filter(Objects::nonNull)
                 .publishOn(scheduler)
                 .cache(1);
  for (int i = 0; i < simultaneousSubscribers; i++) {
    flux.subscribe(s -> latch.countDown());
  }
  sink.next("data");
  Assertions.assertThat(latch.await(4, TimeUnit.SECONDS))
       .overridingErrorMessage("Data not received")
       .isTrue();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void allSameFusable() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  ts.requestedFusionMode(Fuseable.ANY);
  Flux.just(1, 1, 1, 1, 1, 1, 1, 1, 1)
    .distinct(k -> k)
    .filter(t -> true)
    .map(t -> t)
    .cache(4)
    .subscribe(ts);
  ts.assertValues(1)
   .assertFuseableSource()
   .assertFusionEnabled()
   .assertFusionMode(Fuseable.ASYNC)
   .assertComplete()
   .assertNoError();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void cacheFluxTTL() {
  VirtualTimeScheduler vts = VirtualTimeScheduler.create();
  Flux<Tuple2<Long, Integer>> source = Flux.just(1, 2, 3)
                       .delayElements(Duration.ofMillis(1000)
                           , vts)
                       .cache(Duration.ofMillis(2000), vts)
                       .elapsed(vts);
  StepVerifier.withVirtualTime(() -> source, () -> vts, Long.MAX_VALUE)
        .thenAwait(Duration.ofSeconds(3))
        .expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 1)
        .expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 2)
        .expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 3)
        .verifyComplete();
  StepVerifier.withVirtualTime(() -> source, () -> vts, Long.MAX_VALUE)
        .thenAwait(Duration.ofSeconds(3))
        .expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 2)
        .expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 3)
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void cacheFluxHistoryTTL() {
  VirtualTimeScheduler vts = VirtualTimeScheduler.create();
  Flux<Tuple2<Long, Integer>> source = Flux.just(1, 2, 3)
                       .delayElements(Duration.ofMillis
                           (1000), vts)
                       .cache(2, Duration.ofMillis(2000), vts)
                       .elapsed(vts);
  StepVerifier.withVirtualTime(() -> source, () -> vts, Long.MAX_VALUE)
        .thenAwait(Duration.ofSeconds(3))
        .expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 1)
        .expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 2)
        .expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 3)
        .verifyComplete();
  StepVerifier.withVirtualTime(() -> source, () -> vts, Long.MAX_VALUE)
        .thenAwait(Duration.ofSeconds(3))
        .expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 2)
        .expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 3)
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void cacheFlux() {
  VirtualTimeScheduler vts = VirtualTimeScheduler.create();
  Flux<Tuple2<Long, Integer>> source = Flux.just(1, 2, 3)
                       .delayElements(Duration.ofMillis(1000)
                           , vts)
                       .cache()
                       .elapsed(vts);
  StepVerifier.withVirtualTime(() -> source, () -> vts, Long.MAX_VALUE)
        .thenAwait(Duration.ofSeconds(3))
        .expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 1)
        .expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 2)
        .expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 3)
        .verifyComplete();
  StepVerifier.withVirtualTime(() -> source, () -> vts, Long.MAX_VALUE)
        .thenAwait(Duration.ofSeconds(3))
        .expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 1)
        .expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 2)
        .expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 3)
        .verifyComplete();
}

相关文章

微信公众号

最新文章

更多

Flux类方法