
x33g5p2x  于2022-01-19 转载在 其他  



[英]Turn this Flux into a hot source and cache last emitted signals for further Subscriber. Will retain an unbounded volume of onNext signals. Completion and Error will also be replayed.


代码示例来源:origin: reactor/reactor-core

 * Turn this {@link Flux} into a hot source and cache last emitted signals for further {@link Subscriber}. Will
 * retain an unbounded volume of onNext signals. Completion and Error will also be
 * replayed.
 * <p>
 * <img class="marble" src="doc-files/marbles/cacheForFlux.svg" alt="">
 * @return a replaying {@link Flux}
public final Flux<T> cache() {
  return cache(Integer.MAX_VALUE);

代码示例来源:origin: reactor/reactor-core

 * Turn this {@link Flux} into a hot source and cache last emitted signals for further
 * {@link Subscriber}. Will retain an unbounded history but apply a per-item expiry timeout
 * <p>
 *   Completion and Error will also be replayed until {@code ttl} triggers in which case
 *   the next {@link Subscriber} will start over a new subscription.
 * <p>
 * <img class="marble" src="doc-files/marbles/cacheWithTtlForFlux.svg" alt="">
 * @param ttl Time-to-live for each cached item and post termination.
 * @param timer the {@link Scheduler} on which to measure the duration.
 * @return a replaying {@link Flux}
public final Flux<T> cache(Duration ttl, Scheduler timer) {
  return cache(Integer.MAX_VALUE, ttl, timer);

代码示例来源:origin: spring-projects/spring-framework

public MockClientHttpRequest(HttpMethod httpMethod, URI url) {
  this.httpMethod = httpMethod;
  this.url = url;
  this.writeHandler = body -> {
    this.body = body.cache();
    return this.body.then();

代码示例来源:origin: spring-projects/spring-framework

public MockServerHttpResponse() {
  super(new DefaultDataBufferFactory());
  this.writeHandler = body -> {
    this.body = body.cache();
    return this.body.then();

代码示例来源:origin: spring-cloud/spring-cloud-gateway

private Function<Flux<DataBuffer>, Mono<Void>> initDefaultWriteHandler() {
  return body -> {
    this.body = body.cache();
    return this.body.then();

代码示例来源:origin: reactor/reactor-core

 * Turn this {@link Flux} into a hot source and cache last emitted signals for further
 * {@link Subscriber}. Will retain an unbounded history but apply a per-item expiry timeout
 * <p>
 *   Completion and Error will also be replayed until {@code ttl} triggers in which case
 *   the next {@link Subscriber} will start over a new subscription.
 * <p>
 * <img class="marble" src="doc-files/marbles/cacheWithTtlForFlux.svg" alt="">
 * @param ttl Time-to-live for each cached item and post termination.
 * @return a replaying {@link Flux}
public final Flux<T> cache(Duration ttl) {
  return cache(ttl, Schedulers.parallel());

代码示例来源:origin: reactor/reactor-core

 * Turn this {@link Flux} into a hot source and cache last emitted signals for further
 * {@link Subscriber}. Will retain up to the given history size and apply a per-item expiry
 * timeout.
 * <p>
 *   Completion and Error will also be replayed until {@code ttl} triggers in which case
 *   the next {@link Subscriber} will start over a new subscription.
 * <p>
 * <img class="marble" src="doc-files/marbles/cacheWithTtlAndMaxLimitForFlux.svg" alt="">
 * @param history number of elements retained in cache
 * @param ttl Time-to-live for each cached item and post termination.
 * @return a replaying {@link Flux}
public final Flux<T> cache(int history, Duration ttl) {
  return cache(history, ttl, Schedulers.parallel());

代码示例来源:origin: spring-projects/spring-framework

public MockServerHttpResponse(DataBufferFactory dataBufferFactory) {
  this.writeHandler = body -> {
    this.body = body.cache();
    return this.body.then();

代码示例来源:origin: spring-projects/spring-framework

public MockClientHttpRequest(HttpMethod httpMethod, URI url) {
  this.httpMethod = httpMethod;
  this.url = url;
  this.writeHandler = body -> {
    this.body = body.cache();
    return this.body.then();

代码示例来源:origin: reactor/reactor-core

public void testParallelism() throws Exception
  Flux<Integer> flux = Flux.just(1, 2, 3);
  Set<String> threadNames = Collections.synchronizedSet(new TreeSet<>());
  AtomicInteger count = new AtomicInteger();
  CountDownLatch latch = new CountDownLatch(3);
      // Uncomment line below for failure
      .subscribe(i ->
  Assert.assertEquals("Multithreaded count", 3, count.get());
  Assert.assertEquals("Multithreaded threads", 3, threadNames.size());

代码示例来源:origin: reactor/reactor-core

public void sampleCombineLatestTest() throws Exception {
  int elements = 40;
  CountDownLatch latch = new CountDownLatch(elements / 2 - 2);
              .subscribe(i -> latch.countDown(), null, latch::countDown);
  awaitLatch(null, latch);

代码示例来源:origin: reactor/reactor-core

public void issue422(){
  Flux<Integer> source = Flux.create((sink) -> {
    for (int i = 0; i < 300; i++) {;
  Flux<Integer> cached = source.cache();
  long cachedCount = cached.concatMapIterable(Collections::singleton)
  //System.out.println("source: " + sourceCount);
  System.out.println("cached: " + cachedCount);

代码示例来源:origin: reactor/reactor-core

public void cacheContextHistory() {
  AtomicInteger contextFillCount = new AtomicInteger();
  Flux<String> cached = Flux.just(1, 2)
               .flatMap(i -> Mono.subscriberContext()
                        .map(ctx -> ctx.getOrDefault("a", "BAD"))
               .subscriberContext(ctx -> ctx.put("a", "GOOD" + contextFillCount.incrementAndGet()));
  //at first pass, the context is captured
  String cacheMiss = cached.blockLast();
  //at second subscribe, the Context fill attempt is still done, but ultimately ignored since first context is cached
  String cacheHit = cached.blockLast();
  assertThat(cacheHit).as("cacheHit").isEqualTo("GOOD1"); //value from the cache
  assertThat(contextFillCount).as("cacheHit").hasValue(2); //function was still invoked
  //at third subscribe, function is called for the 3rd time, but the context is still cached
  String cacheHit2 = cached.blockLast();
  //at fourth subscribe, function is called for the 4th time, but the context is still cached
  String cacheHit3 = cached.blockLast();

代码示例来源:origin: reactor/reactor-core

@Test(timeout = 5_000)
public void testBufferSize1Created() throws Exception {
  TopicProcessor<String> broadcast = TopicProcessor.<String>builder().name("share-name").bufferSize(1).autoCancel(true).build();
  int simultaneousSubscribers = 3000;
  CountDownLatch latch = new CountDownLatch(simultaneousSubscribers);
  Scheduler scheduler = Schedulers.single();
  FluxSink<String> sink = broadcast.sink();
  Flux<String> flux = broadcast.filter(Objects::nonNull)
  for (int i = 0; i < simultaneousSubscribers; i++) {
    flux.subscribe(s -> latch.countDown());
  assertThat(latch.await(4, TimeUnit.SECONDS))
      .overridingErrorMessage("Data not received")

代码示例来源:origin: reactor/reactor-core

public void cacheFluxTTL2() {
  VirtualTimeScheduler vts = VirtualTimeScheduler.create();
  AtomicInteger i = new AtomicInteger(0);
  Flux<Integer> source = Flux.defer(() -> Flux.just(i.incrementAndGet()))
                .cache(Duration.ofMillis(2000), vts);

代码示例来源:origin: reactor/reactor-core

@Test(timeout = 5_000)
public void testBufferSize1Created() throws Exception {
  WorkQueueProcessor<String> broadcast = WorkQueueProcessor.<String>builder()
  int simultaneousSubscribers = 3000;
  CountDownLatch latch = new CountDownLatch(simultaneousSubscribers);
  Scheduler scheduler = Schedulers.single();
  FluxSink<String> sink = broadcast.sink();
  Flux<String> flux = broadcast.filter(Objects::nonNull)
  for (int i = 0; i < simultaneousSubscribers; i++) {
    flux.subscribe(s -> latch.countDown());
  Assertions.assertThat(latch.await(4, TimeUnit.SECONDS))
       .overridingErrorMessage("Data not received")

代码示例来源:origin: reactor/reactor-core

public void allSameFusable() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Flux.just(1, 1, 1, 1, 1, 1, 1, 1, 1)
    .distinct(k -> k)
    .filter(t -> true)
    .map(t -> t)

代码示例来源:origin: reactor/reactor-core

public void cacheFluxTTL() {
  VirtualTimeScheduler vts = VirtualTimeScheduler.create();
  Flux<Tuple2<Long, Integer>> source = Flux.just(1, 2, 3)
                           , vts)
                       .cache(Duration.ofMillis(2000), vts)
  StepVerifier.withVirtualTime(() -> source, () -> vts, Long.MAX_VALUE)
        .expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 1)
        .expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 2)
        .expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 3)
  StepVerifier.withVirtualTime(() -> source, () -> vts, Long.MAX_VALUE)
        .expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 2)
        .expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 3)

代码示例来源:origin: reactor/reactor-core

public void cacheFluxHistoryTTL() {
  VirtualTimeScheduler vts = VirtualTimeScheduler.create();
  Flux<Tuple2<Long, Integer>> source = Flux.just(1, 2, 3)
                           (1000), vts)
                       .cache(2, Duration.ofMillis(2000), vts)
  StepVerifier.withVirtualTime(() -> source, () -> vts, Long.MAX_VALUE)
        .expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 1)
        .expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 2)
        .expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 3)
  StepVerifier.withVirtualTime(() -> source, () -> vts, Long.MAX_VALUE)
        .expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 2)
        .expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 3)

代码示例来源:origin: reactor/reactor-core

public void cacheFlux() {
  VirtualTimeScheduler vts = VirtualTimeScheduler.create();
  Flux<Tuple2<Long, Integer>> source = Flux.just(1, 2, 3)
                           , vts)
  StepVerifier.withVirtualTime(() -> source, () -> vts, Long.MAX_VALUE)
        .expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 1)
        .expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 2)
        .expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 3)
  StepVerifier.withVirtualTime(() -> source, () -> vts, Long.MAX_VALUE)
        .expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 1)
        .expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 2)
        .expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 3)




