reactor.core.publisher.Flux.transform()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(8.4k)|赞(0)|评价(0)|浏览(389)

本文整理了Java中reactor.core.publisher.Flux.transform()方法的一些代码示例,展示了Flux.transform()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.transform()方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:transform

Flux.transform介绍

[英]Transform this Flux in order to generate a target Flux. Unlike #compose(Function), the provided function is executed as part of assembly.

Function applySchedulers = flux -> flux.subscribeOn(Schedulers.elastic()) 
.publishOn(Schedulers.parallel()); 
flux.transform(applySchedulers).map(v -> v * v).subscribe();

[中]变换此通量以生成目标通量。与#compose(函数)不同,提供的函数作为程序集的一部分执行

Function applySchedulers = flux -> flux.subscribeOn(Schedulers.elastic()) 
.publishOn(Schedulers.parallel()); 
flux.transform(applySchedulers).map(v -> v * v).subscribe();

代码示例

代码示例来源:origin: codecentric/spring-boot-admin

private static Function<ClientResponse, Mono<ClientResponse>> convertClientResponse(Function<Flux<DataBuffer>, Flux<DataBuffer>> bodConverter,
                                          MediaType contentType) {
  return response -> {
    ClientResponse convertedResponse = ClientResponse.from(response).headers(headers -> {
      headers.replace(HttpHeaders.CONTENT_TYPE, singletonList(contentType.toString()));
      headers.remove(HttpHeaders.CONTENT_LENGTH);
    }).body(response.bodyToFlux(DataBuffer.class).transform(bodConverter)).build();
    return Mono.just(convertedResponse);
  };
}

代码示例来源:origin: resilience4j/resilience4j

@Test
public void shouldPropagateError() {
  StepVerifier.create(
      Flux.error(new IOException("BAM!"))
          .transform(BulkheadOperator.of(bulkhead))
          .transform(RateLimiterOperator.of(rateLimiter))
          .transform(CircuitBreakerOperator.of(circuitBreaker))
  ).expectError(IOException.class)
      .verify(Duration.ofSeconds(1));
}

代码示例来源:origin: resilience4j/resilience4j

@Test
public void shouldEmitErrorWithCircuitBreakerOpenExceptionEvenWhenErrorDuringSubscribe() {
  circuitBreaker.transitionToOpenState();
  StepVerifier.create(
      Flux.error(new IOException("BAM!"))
          .transform(CircuitBreakerOperator.of(circuitBreaker))
          .transform(BulkheadOperator.of(bulkhead, Schedulers.immediate()))
          .transform(RateLimiterOperator.of(rateLimiter, Schedulers.immediate()))
  ).expectError(CircuitBreakerOpenException.class)
      .verify(Duration.ofSeconds(1));
}

代码示例来源:origin: resilience4j/resilience4j

@Test
  public void shouldEmitErrorWithCircuitBreakerOpenExceptionEvenWhenErrorNotOnSubscribe() {
    circuitBreaker.transitionToOpenState();
    StepVerifier.create(
        Flux.error(new IOException("BAM!"), true)
            .transform(CircuitBreakerOperator.of(circuitBreaker))
            .transform(BulkheadOperator.of(bulkhead, Schedulers.immediate()))
            .transform(RateLimiterOperator.of(rateLimiter, Schedulers.immediate()))
    ).expectError(CircuitBreakerOpenException.class)
        .verify(Duration.ofSeconds(1));
  }
}

代码示例来源:origin: resilience4j/resilience4j

@Test
public void shouldEmitEvents() {
  StepVerifier.create(
      Flux.just("Event 1", "Event 2")
          .transform(BulkheadOperator.of(bulkhead))
          .transform(RateLimiterOperator.of(rateLimiter))
          .transform(CircuitBreakerOperator.of(circuitBreaker))
  ).expectNext("Event 1")
      .expectNext("Event 2")
      .verifyComplete();
}

代码示例来源:origin: resilience4j/resilience4j

@Test
public void shouldPropagateError() {
  StepVerifier.create(
      Flux.error(new IOException("BAM!"))
          .transform(BulkheadOperator.of(bulkhead)))
      .expectSubscription()
      .expectError(IOException.class)
      .verify(Duration.ofSeconds(1));
  assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(1);
}

代码示例来源:origin: resilience4j/resilience4j

@Test
public void shouldPropagateError() {
  StepVerifier.create(
      Flux.error(new IOException("BAM!"))
          .transform(CircuitBreakerOperator.of(circuitBreaker)))
      .expectError(IOException.class)
      .verify(Duration.ofSeconds(1));
  assertSingleFailedCall();
}

代码示例来源:origin: resilience4j/resilience4j

@Test
public void shouldPropagateErrorWhenErrorNotOnSubscribe() {
  StepVerifier.create(
      Flux.error(new IOException("BAM!"), true)
          .transform(CircuitBreakerOperator.of(circuitBreaker)))
      .expectError(IOException.class)
      .verify(Duration.ofSeconds(1));
  assertSingleFailedCall();
}

代码示例来源:origin: resilience4j/resilience4j

@Test
public void shouldEmitBulkheadFullExceptionEvenWhenErrorDuringSubscribe() {
  bulkhead.isCallPermitted();
  StepVerifier.create(
      Flux.error(new IOException("BAM!"))
          .transform(BulkheadOperator.of(bulkhead, Schedulers.immediate())))
      .expectSubscription()
      .expectError(BulkheadFullException.class)
      .verify(Duration.ofSeconds(1));
  assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(0);
}

代码示例来源:origin: resilience4j/resilience4j

@Test
  public void shouldEmitCircuitBreakerOpenExceptionEvenWhenErrorDuringSubscribe() {
    circuitBreaker.transitionToOpenState();
    StepVerifier.create(
        Flux.error(new IOException("BAM!"))
            .transform(CircuitBreakerOperator.of(circuitBreaker)))
        .expectError(CircuitBreakerOpenException.class)
        .verify(Duration.ofSeconds(1));

    assertNoRegisteredCall();
  }
}

代码示例来源:origin: resilience4j/resilience4j

@Test
  public void shouldEmitBulkheadFullExceptionEvenWhenErrorNotOnSubscribe() {
    bulkhead.isCallPermitted();

    StepVerifier.create(
        Flux.error(new IOException("BAM!"), true)
            .transform(BulkheadOperator.of(bulkhead, Schedulers.immediate())))
        .expectSubscription()
        .expectError(BulkheadFullException.class)
        .verify(Duration.ofSeconds(1));

    assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(0);
  }
}

代码示例来源:origin: resilience4j/resilience4j

@Test
public void shouldEmitCircuitBreakerOpenExceptionEvenWhenErrorNotOnSubscribe() {
  circuitBreaker.transitionToOpenState();
  StepVerifier.create(
      Flux.error(new IOException("BAM!"), true)
          .transform(CircuitBreakerOperator.of(circuitBreaker)))
      .expectError(CircuitBreakerOpenException.class)
      .verify(Duration.ofSeconds(1));
  assertNoRegisteredCall();
}

代码示例来源:origin: resilience4j/resilience4j

@Test
public void shouldPropagateError() {
  StepVerifier.create(
      Flux.error(new IOException("BAM!"))
          .transform(RateLimiterOperator.of(rateLimiter)))
      .expectSubscription()
      .expectError(IOException.class)
      .verify(Duration.ofSeconds(1));
  assertSinglePermitUsed();
}

代码示例来源:origin: resilience4j/resilience4j

@Test
public void shouldEmitErrorWithBulkheadFullException() {
  bulkhead.isCallPermitted();
  StepVerifier.create(
      Flux.just("Event")
          .transform(BulkheadOperator.of(bulkhead)))
      .expectSubscription()
      .expectError(BulkheadFullException.class)
      .verify(Duration.ofSeconds(1));
  assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(0);
}

代码示例来源:origin: resilience4j/resilience4j

@Test
public void shouldEmitEvent() {
  StepVerifier.create(
      Flux.just("Event 1", "Event 2")
          .transform(BulkheadOperator.of(bulkhead)))
      .expectNext("Event 1")
      .expectNext("Event 2")
      .verifyComplete();
  assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(1);
}

代码示例来源:origin: resilience4j/resilience4j

@Test
public void shouldEmitRequestNotPermittedExceptionEvenWhenErrorDuringSubscribe() {
  saturateRateLimiter();
  StepVerifier.create(
      Flux.error(new IOException("BAM!"))
          .transform(RateLimiterOperator.of(rateLimiter)))
      .expectSubscription()
      .expectError(RequestNotPermitted.class)
      .verify(Duration.ofSeconds(1));
  assertNoPermitLeft();
}

代码示例来源:origin: resilience4j/resilience4j

@Test
  public void shouldEmitRequestNotPermittedExceptionEvenWhenErrorNotOnSubscribe() {
    saturateRateLimiter();

    StepVerifier.create(
        Flux.error(new IOException("BAM!"), true)
            .transform(RateLimiterOperator.of(rateLimiter)))
        .expectSubscription()
        .expectError(RequestNotPermitted.class)
        .verify(Duration.ofSeconds(1));

    assertNoPermitLeft();
  }
}

代码示例来源:origin: resilience4j/resilience4j

@Test
public void shouldEmitErrorWithCircuitBreakerOpenException() {
  circuitBreaker.transitionToOpenState();
  StepVerifier.create(
      Flux.just("Event 1", "Event 2")
          .transform(CircuitBreakerOperator.of(circuitBreaker)))
      .expectError(CircuitBreakerOpenException.class)
      .verify(Duration.ofSeconds(1));
  assertNoRegisteredCall();
}

代码示例来源:origin: resilience4j/resilience4j

@Test
public void shouldEmitRequestNotPermittedException() {
  saturateRateLimiter();
  StepVerifier.create(
      Flux.just("Event")
          .transform(RateLimiterOperator.of(rateLimiter, Schedulers.immediate())))
      .expectSubscription()
      .expectError(RequestNotPermitted.class)
      .verify(Duration.ofSeconds(1));
  assertNoPermitLeft();
}

代码示例来源:origin: resilience4j/resilience4j

@Test
public void shouldEmitEvent() {
  StepVerifier.create(
      Flux.just("Event 1", "Event 2")
          .transform(CircuitBreakerOperator.of(circuitBreaker)))
      .expectNext("Event 1")
      .expectNext("Event 2")
      .verifyComplete();
  assertSingleSuccessfulCall();
}

相关文章

微信公众号

最新文章

更多

Flux类方法