reactor.core.publisher.Flux.range()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(6.3k)|赞(0)|评价(0)|浏览(424)

本文整理了Java中reactor.core.publisher.Flux.range()方法的一些代码示例,展示了Flux.range()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.range()方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:range

Flux.range介绍

[英]Build a Flux that will only emit a sequence of count incrementing integers, starting from start. That is, emit integers between start (included) and start + count (excluded) then complete.
[中]构建一个通量,该通量将只从开始发出一个计数递增整数序列。也就是说,在开始(包括)和开始+计数(排除)之间发出整数,然后完成。

代码示例

代码示例来源:origin: spring-projects/spring-framework

@GetMapping("/SPR-16051")
public Flux<String> errors() {
  return Flux.range(1, 10000)
      .map(i -> {
        if (i == 1000) {
          throw new RuntimeException("Random error");
        }
        return i + ". foo bar";
      });
}

代码示例来源:origin: spring-projects/spring-framework

private Publisher<DataBuffer> multipleChunks() {
  int chunkSize = RESPONSE_SIZE / CHUNKS;
  return Flux.range(1, CHUNKS).map(integer -> randomBuffer(chunkSize));
}

代码示例来源:origin: reactor/reactor-core

Flux<List<Integer>> scenario_bufferWithTimeoutAccumulateOnTimeOrSize() {
  return Flux.range(1, 6)
        .delayElements(Duration.ofMillis(300))
        .bufferTimeout(5, Duration.ofMillis(2000));
}

代码示例来源:origin: reactor/reactor-core

Flux<List<Integer>> scenario_bufferWithTimeoutAccumulateOnTimeOrSize2() {
  return Flux.range(1, 6)
        .delayElements(Duration.ofMillis(300))
        .bufferTimeout(5, Duration.ofMillis(2000));
}

代码示例来源:origin: reactor/reactor-core

Flux<List<Integer>> scenario_bufferWithTimeoutThrowingExceptionOnTimeOrSizeIfDownstreamDemandIsLow() {
  return Flux.range(1, 6)
        .delayElements(Duration.ofMillis(300))
        .bufferTimeout(5, Duration.ofMillis(100));
}

代码示例来源:origin: reactor/reactor-core

@Test(timeout = 5000)
public void normal2() {
  Queue<Integer> q = new ArrayBlockingQueue<>(1);
  List<Integer> values = new ArrayList<>();
  for (Integer i : Flux.range(1, 10)
             .toIterable(1, () -> q)) {
    values.add(i);
  }
  Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), values);
}

代码示例来源:origin: reactor/reactor-core

@Test(timeout = 5000)
public void toStream() {
  List<Integer> values = new ArrayList<>();
  Flux.range(1, 10)
    .toStream()
    .forEach(values::add);
  Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), values);
}

代码示例来源:origin: spring-projects/spring-framework

@Test
public void echo() throws Exception {
  int count = 100;
  Flux<String> input = Flux.range(1, count).map(index -> "msg-" + index);
  ReplayProcessor<Object> output = ReplayProcessor.create(count);
  this.client.execute(getUrl("/echo"), session -> session
      .send(input.map(session::textMessage))
      .thenMany(session.receive().take(count).map(WebSocketMessage::getPayloadAsText))
      .subscribeWith(output)
      .then())
      .block(TIMEOUT);
  assertEquals(input.collectList().block(TIMEOUT), output.collectList().block(TIMEOUT));
}

代码示例来源:origin: reactor/reactor-core

@Test
public void largerSkip() {
  AssertSubscriber<List<Integer>> ts = AssertSubscriber.create();
  Flux.range(1, 10).buffer(2, 3).subscribe(ts);
  ts.assertValues(Arrays.asList(1, 2),
      Arrays.asList(4, 5),
      Arrays.asList(7, 8),
      Arrays.asList(10))
   .assertComplete()
   .assertNoError();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void scanOperatorNullTags() throws Exception {
  Flux<Integer> source = Flux.range(1, 4);
  FluxNameFuseable<Integer> test = new FluxNameFuseable<>(source, "foo", null);
  assertThat(test.scan(Scannable.Attr.TAGS)).isNull();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void largerSkipEven() {
  AssertSubscriber<List<Integer>> ts = AssertSubscriber.create();
  Flux.range(1, 8).buffer(2, 3).subscribe(ts);
  ts.assertValues(Arrays.asList(1, 2), Arrays.asList(4, 5), Arrays.asList(7, 8))
   .assertComplete()
   .assertNoError();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void normal1() {
  StepVerifier.create(Flux.range(1, 5)
              .onBackpressureBuffer(Duration.ofMinutes(1), Integer.MAX_VALUE, v -> {}))
        .expectNext(1, 2, 3, 4, 5)
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void bufferLimit() {
  StepVerifier.create(Flux.range(1, 5)
      .onBackpressureBuffer(Duration.ofMinutes(1), 1, this, Schedulers.single()),
      0)
        .expectSubscription()
        .expectNoEvent(Duration.ofMillis(100))
        .thenRequest(1)
        .expectNext(5)
        .verifyComplete();
  assertThat(evicted).containsExactly(1, 2, 3, 4);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void prefetchMaxTranslatesToUnboundedRequest() {
  AtomicLong requested = new AtomicLong();
  StepVerifier.create(Flux.just(1, 2, 3).hide()
              .doOnRequest(requested::set)
              .concatMap(i -> Flux.range(0, i), Integer.MAX_VALUE))
        .expectNext(0, 0, 1, 0, 1, 2)
        .verifyComplete();
  assertThat(requested.get())
      .isNotEqualTo(Integer.MAX_VALUE)
      .isEqualTo(Long.MAX_VALUE);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void namedHideFluxTest() {
  Flux<Integer> named1 =
      Flux.range(1, 10)
        .hide()
        .name("100s");
  Flux<Integer> named2 = named1.filter(i -> i % 3 == 0)
                 .name("multiple of 3 100s")
                 .hide();
  assertThat(Scannable.from(named1).name()).isEqualTo("100s");
  assertThat(Scannable.from(named2).name()).isEqualTo("multiple of 3 100s");
}

代码示例来源:origin: reactor/reactor-core

@Test
public void blockingLast() {
  Assert.assertEquals((Integer) 10,
      Flux.range(1, 10)
        .publishOn(scheduler)
        .blockLast());
}

代码示例来源:origin: reactor/reactor-core

@Test
public void normalBoundary2() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Flux.range(1, 2)
    .hide()
    .concatMapDelayError(v -> Flux.range(v, 2))
    .subscribe(ts);
  ts.assertValues(1, 2, 2, 3)
   .assertNoError()
   .assertComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void expectNextSequenceWithPartialMatchingSequenceNoMoreExpectation() {
  assertThatExceptionOfType(AssertionError.class)
      .isThrownBy(() -> StepVerifier.create(Flux.range(1, 5))
        .expectNextSequence(Arrays.asList(1, 2, 3))
        .verifyComplete())
      .withMessage("expectation \"expectComplete\" failed (expected: onComplete(); actual: onNext(4))");
}

代码示例来源:origin: reactor/reactor-core

@Test
public void requestTrackingDisabledIfNotNamed() {
  Flux<Integer> source = Flux.range(1, 10)
                .hide();
  new FluxMetrics<>(source, registry)
      .blockLast();
  DistributionSummary meter = registry.find(METER_REQUESTED)
                    .summary();
  if (meter != null) { //meter could be null in some tests
    assertThat(meter.count()).isZero();
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
public void neverTriggered() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Flux.range(1, 10)
    .delaySubscription(Flux.never())
    .subscribe(ts);
  ts.assertNoValues()
   .assertNoError()
   .assertNotComplete();
}

相关文章

微信公众号

最新文章

更多

Flux类方法