reactor.core.publisher.Flux.delaySubscription()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(8.1k)|赞(0)|评价(0)|浏览(200)

本文整理了Java中reactor.core.publisher.Flux.delaySubscription()方法的一些代码示例,展示了Flux.delaySubscription()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.delaySubscription()方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:delaySubscription

Flux.delaySubscription介绍

[英]Delay the Flux#subscribe(Subscriber) to this Flux source until the given period elapses. The delay is introduced through the Schedulers#parallel() default Scheduler.
[中]延迟流量#订阅(订户)到此流量源,直到给定的周期过去。延迟是通过调度程序#parallel()默认调度程序引入的。

代码示例

代码示例来源:origin: reactor/reactor-core

/**
 * Delay the {@link Flux#subscribe(Subscriber) subscription} to this {@link Flux} source until the given
 * period elapses, as measured on the user-provided {@link Scheduler}.
 *
 * <p>
 * <img class="marble" src="doc-files/marbles/delaySubscriptionForFlux.svg" alt="">
 *
 * @param delay {@link Duration} before subscribing this {@link Flux}
 * @param timer a time-capable {@link Scheduler} instance to run on
 *
 * @return a delayed {@link Flux}
 */
public final Flux<T> delaySubscription(Duration delay, Scheduler timer) {
  return delaySubscription(Mono.delay(delay, timer));
}

代码示例来源:origin: reactor/reactor-core

/**
 * Delay the {@link Flux#subscribe(Subscriber) subscription} to this {@link Flux} source until the given
 * period elapses. The delay is introduced through the {@link Schedulers#parallel() parallel} default Scheduler.
 *
 * <p>
 * <img class="marble" src="doc-files/marbles/delaySubscriptionForFlux.svg" alt="">
 *
 * @param delay duration before subscribing this {@link Flux}
 *
 * @return a delayed {@link Flux}
 *
 */
public final Flux<T> delaySubscription(Duration delay) {
  return delaySubscription(delay, Schedulers.parallel());
}

代码示例来源:origin: reactor/reactor-core

Flux<Integer> scenario_delayedTrigger(){
  return Flux.just(1)
        .delaySubscription(Duration.ofSeconds(3));
}

代码示例来源:origin: reactor/reactor-core

Flux<Integer> scenario_delayedTrigger2(){
  return Flux.just(1)
        .delaySubscription(Duration.ofMillis(50));
}

代码示例来源:origin: reactor/reactor-core

@Test(expected = NullPointerException.class)
public void otherNull() {
  Flux.never().delaySubscription((Publisher<?>)null);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void windowWithTimeoutStartsTimerOnSubscription() {
  StepVerifier.withVirtualTime(() ->
      Mono.delay(Duration.ofMillis(300))
        .thenMany(Flux.range(1, 3))
        .delayElements(Duration.ofMillis(150))
        .concatWith(Flux.range(4, 10).delaySubscription(Duration.ofMillis(500)))
        .windowTimeout(10, Duration.ofMillis(500))
        .flatMap(Flux::collectList)
  )
        .expectSubscription()
        .thenAwait(Duration.ofSeconds(100))
        .assertNext(l -> assertThat(l).containsExactly(1))
        .assertNext(l -> assertThat(l).containsExactly(2, 3))
        .assertNext(l -> assertThat(l).containsExactly(4, 5, 6, 7, 8, 9, 10, 11, 12, 13))
        .assertNext(l -> assertThat(l).isEmpty())
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void testSubmitSession() throws Exception {
  FluxProcessor<Integer, Integer> processor = EmitterProcessor.create();
  AtomicInteger count = new AtomicInteger();
  CountDownLatch latch = new CountDownLatch(1);
  Scheduler scheduler = Schedulers.parallel();
  processor.publishOn(scheduler)
       .delaySubscription(Duration.ofMillis(1000))
       .limitRate(1)
       .subscribe(d -> {
         count.incrementAndGet();
         latch.countDown();
       });
  FluxSink<Integer> session = processor.sink();
  session.next(1);
  //System.out.println(emission);
  session.complete();
  latch.await(5, TimeUnit.SECONDS);
  Assert.assertTrue("latch : " + count, count.get() == 1);
  scheduler.dispose();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void shouldSendOnErrorSignalWithDelaySubscription() {
  Signal<? extends Long>[] first = new Signal[1];
  RuntimeException error = new RuntimeException();
  StepVerifier.create(Flux.<Long>error(error)
      .switchOnFirst((s, f) -> {
        first[0] = s;
        return f.delaySubscription(Duration.ofMillis(100));
      }))
        .expectSubscription()
        .expectError(RuntimeException.class)
        .verify();
  Assertions.assertThat(first).containsExactly(Signal.error(error));
}

代码示例来源:origin: reactor/reactor-core

@Test
public void timeoutDropWhenNoCancelWithoutFallback() {
  for (int i = 0; i < 50; i++) {
    StepVerifier.withVirtualTime(
        () -> Flux.just("cat")
             .delaySubscription(Duration.ofMillis(3))
             // We cancel on another scheduler that won't do anything to force it to act like
             // the event is already in flight
             .cancelOn(Schedulers.fromExecutor(r -> {}))
             .timeout(Duration.ofMillis(2))
    )
          .thenAwait(Duration.ofSeconds(5))
          .expectError(TimeoutException.class)
          .verify();
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
public void shouldSendOnCompleteSignalWithDelaySubscription() {
  Signal<? extends Long>[] first = new Signal[1];
  StepVerifier.create(Flux.<Long>empty()
      .switchOnFirst((s, f) -> {
        first[0] = s;
        return f.delaySubscription(Duration.ofMillis(100));
      }))
        .expectSubscription()
        .expectComplete()
        .verify();
  Assertions.assertThat(first).containsExactly(Signal.complete());
}

代码示例来源:origin: reactor/reactor-core

@Test
public void timeoutDropWhenNoCancelWithFallback() {
  for (int i = 0; i < 50; i++) {
    StepVerifier.withVirtualTime(
        () -> Flux.just("cat")
             .delaySubscription(Duration.ofMillis(3))
             // We cancel on another scheduler that won't do anything to force it to act like
             // the event is already in flight
             .cancelOn(Schedulers.fromExecutor(r -> {}))
             .timeout(Duration.ofMillis(2), Flux.just("dog").delayElements(Duration.ofMillis(5)))
    )
          .thenAwait(Duration.ofSeconds(5))
          .expectNext("dog")
          .expectComplete()
          .verify();
  }
}

代码示例来源:origin: reactor/reactor-core

.delaySubscription(Duration.ofMillis(1L))
.log("streamed")
.map(it -> it * 2)

代码示例来源:origin: reactor/reactor-core

@Test
public void emptyTrigger() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Flux.range(1, 10)
    .delaySubscription(Flux.empty())
    .subscribe(ts);
  ts.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
   .assertNoError()
   .assertComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void neverTriggered() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Flux.range(1, 10)
    .delaySubscription(Flux.never())
    .subscribe(ts);
  ts.assertNoValues()
   .assertNoError()
   .assertNotComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void normal() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Flux.range(1, 10)
    .delaySubscription(Flux.just(1))
    .subscribe(ts);
  ts.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
   .assertNoError()
   .assertComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void manyTriggered() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Flux.range(1, 10)
    .delaySubscription(Flux.range(1, 10))
    .subscribe(ts);
  ts.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
   .assertNoError()
   .assertComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void verifyVirtualTimeOnComplete() {
  StepVerifier.withVirtualTime(() -> Flux.empty()
                      .delaySubscription(Duration.ofHours(1))
                      .log())
        .thenAwait(Duration.ofHours(1))
        .expectComplete()
        .verify();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void emptyTriggerBackpressured() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create(0);
  Flux.range(1, 10)
    .delaySubscription(Flux.empty())
    .subscribe(ts);
  ts.assertNoValues()
   .assertNotComplete()
   .assertNoError();
  ts.request(2);
  ts.assertValues(1, 2)
   .assertNotComplete()
   .assertNoError();
  ts.request(5);
  ts.assertValues(1, 2, 3, 4, 5, 6, 7)
   .assertNotComplete()
   .assertNoError();
  ts.request(10);
  ts.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
   .assertNoError()
   .assertComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void manyTriggeredBackpressured() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create(0);
  Flux.range(1, 10)
    .delaySubscription(Flux.range(1, 10))
    .subscribe(ts);
  ts.assertNoValues()
   .assertNotComplete()
   .assertNoError();
  ts.request(2);
  ts.assertValues(1, 2)
   .assertNotComplete()
   .assertNoError();
  ts.request(5);
  ts.assertValues(1, 2, 3, 4, 5, 6, 7)
   .assertNotComplete()
   .assertNoError();
  ts.request(10);
  ts.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
   .assertNoError()
   .assertComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void normalBackpressured() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create(0);
  Flux.range(1, 10)
    .delaySubscription(Flux.just(1))
    .subscribe(ts);
  ts.assertNoValues()
   .assertNotComplete()
   .assertNoError();
  ts.request(2);
  ts.assertValues(1, 2)
   .assertNotComplete()
   .assertNoError();
  ts.request(5);
  ts.assertValues(1, 2, 3, 4, 5, 6, 7)
   .assertNotComplete()
   .assertNoError();
  ts.request(10);
  ts.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
   .assertNoError()
   .assertComplete();
}

相关文章

微信公众号

最新文章

更多

Flux类方法