本文整理了Java中reactor.core.publisher.Flux.transform()
方法的一些代码示例,展示了Flux.transform()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.transform()
方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:transform
[英]Transform this Flux in order to generate a target Flux. Unlike #compose(Function), the provided function is executed as part of assembly.
Function applySchedulers = flux -> flux.subscribeOn(Schedulers.elastic())
.publishOn(Schedulers.parallel());
flux.transform(applySchedulers).map(v -> v * v).subscribe();
[中]变换此通量以生成目标通量。与#compose(函数)不同,提供的函数作为程序集的一部分执行
Function applySchedulers = flux -> flux.subscribeOn(Schedulers.elastic())
.publishOn(Schedulers.parallel());
flux.transform(applySchedulers).map(v -> v * v).subscribe();
代码示例来源:origin: codecentric/spring-boot-admin
private static Function<ClientResponse, Mono<ClientResponse>> convertClientResponse(Function<Flux<DataBuffer>, Flux<DataBuffer>> bodConverter,
MediaType contentType) {
return response -> {
ClientResponse convertedResponse = ClientResponse.from(response).headers(headers -> {
headers.replace(HttpHeaders.CONTENT_TYPE, singletonList(contentType.toString()));
headers.remove(HttpHeaders.CONTENT_LENGTH);
}).body(response.bodyToFlux(DataBuffer.class).transform(bodConverter)).build();
return Mono.just(convertedResponse);
};
}
代码示例来源:origin: resilience4j/resilience4j
@Test
public void shouldPropagateError() {
StepVerifier.create(
Flux.error(new IOException("BAM!"))
.transform(BulkheadOperator.of(bulkhead))
.transform(RateLimiterOperator.of(rateLimiter))
.transform(CircuitBreakerOperator.of(circuitBreaker))
).expectError(IOException.class)
.verify(Duration.ofSeconds(1));
}
代码示例来源:origin: resilience4j/resilience4j
@Test
public void shouldEmitErrorWithCircuitBreakerOpenExceptionEvenWhenErrorDuringSubscribe() {
circuitBreaker.transitionToOpenState();
StepVerifier.create(
Flux.error(new IOException("BAM!"))
.transform(CircuitBreakerOperator.of(circuitBreaker))
.transform(BulkheadOperator.of(bulkhead, Schedulers.immediate()))
.transform(RateLimiterOperator.of(rateLimiter, Schedulers.immediate()))
).expectError(CircuitBreakerOpenException.class)
.verify(Duration.ofSeconds(1));
}
代码示例来源:origin: resilience4j/resilience4j
@Test
public void shouldEmitErrorWithCircuitBreakerOpenExceptionEvenWhenErrorNotOnSubscribe() {
circuitBreaker.transitionToOpenState();
StepVerifier.create(
Flux.error(new IOException("BAM!"), true)
.transform(CircuitBreakerOperator.of(circuitBreaker))
.transform(BulkheadOperator.of(bulkhead, Schedulers.immediate()))
.transform(RateLimiterOperator.of(rateLimiter, Schedulers.immediate()))
).expectError(CircuitBreakerOpenException.class)
.verify(Duration.ofSeconds(1));
}
}
代码示例来源:origin: resilience4j/resilience4j
@Test
public void shouldEmitEvents() {
StepVerifier.create(
Flux.just("Event 1", "Event 2")
.transform(BulkheadOperator.of(bulkhead))
.transform(RateLimiterOperator.of(rateLimiter))
.transform(CircuitBreakerOperator.of(circuitBreaker))
).expectNext("Event 1")
.expectNext("Event 2")
.verifyComplete();
}
代码示例来源:origin: resilience4j/resilience4j
@Test
public void shouldPropagateError() {
StepVerifier.create(
Flux.error(new IOException("BAM!"))
.transform(BulkheadOperator.of(bulkhead)))
.expectSubscription()
.expectError(IOException.class)
.verify(Duration.ofSeconds(1));
assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(1);
}
代码示例来源:origin: resilience4j/resilience4j
@Test
public void shouldPropagateError() {
StepVerifier.create(
Flux.error(new IOException("BAM!"))
.transform(CircuitBreakerOperator.of(circuitBreaker)))
.expectError(IOException.class)
.verify(Duration.ofSeconds(1));
assertSingleFailedCall();
}
代码示例来源:origin: resilience4j/resilience4j
@Test
public void shouldPropagateErrorWhenErrorNotOnSubscribe() {
StepVerifier.create(
Flux.error(new IOException("BAM!"), true)
.transform(CircuitBreakerOperator.of(circuitBreaker)))
.expectError(IOException.class)
.verify(Duration.ofSeconds(1));
assertSingleFailedCall();
}
代码示例来源:origin: resilience4j/resilience4j
@Test
public void shouldEmitBulkheadFullExceptionEvenWhenErrorDuringSubscribe() {
bulkhead.isCallPermitted();
StepVerifier.create(
Flux.error(new IOException("BAM!"))
.transform(BulkheadOperator.of(bulkhead, Schedulers.immediate())))
.expectSubscription()
.expectError(BulkheadFullException.class)
.verify(Duration.ofSeconds(1));
assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(0);
}
代码示例来源:origin: resilience4j/resilience4j
@Test
public void shouldEmitCircuitBreakerOpenExceptionEvenWhenErrorDuringSubscribe() {
circuitBreaker.transitionToOpenState();
StepVerifier.create(
Flux.error(new IOException("BAM!"))
.transform(CircuitBreakerOperator.of(circuitBreaker)))
.expectError(CircuitBreakerOpenException.class)
.verify(Duration.ofSeconds(1));
assertNoRegisteredCall();
}
}
代码示例来源:origin: resilience4j/resilience4j
@Test
public void shouldEmitBulkheadFullExceptionEvenWhenErrorNotOnSubscribe() {
bulkhead.isCallPermitted();
StepVerifier.create(
Flux.error(new IOException("BAM!"), true)
.transform(BulkheadOperator.of(bulkhead, Schedulers.immediate())))
.expectSubscription()
.expectError(BulkheadFullException.class)
.verify(Duration.ofSeconds(1));
assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(0);
}
}
代码示例来源:origin: resilience4j/resilience4j
@Test
public void shouldEmitCircuitBreakerOpenExceptionEvenWhenErrorNotOnSubscribe() {
circuitBreaker.transitionToOpenState();
StepVerifier.create(
Flux.error(new IOException("BAM!"), true)
.transform(CircuitBreakerOperator.of(circuitBreaker)))
.expectError(CircuitBreakerOpenException.class)
.verify(Duration.ofSeconds(1));
assertNoRegisteredCall();
}
代码示例来源:origin: resilience4j/resilience4j
@Test
public void shouldPropagateError() {
StepVerifier.create(
Flux.error(new IOException("BAM!"))
.transform(RateLimiterOperator.of(rateLimiter)))
.expectSubscription()
.expectError(IOException.class)
.verify(Duration.ofSeconds(1));
assertSinglePermitUsed();
}
代码示例来源:origin: resilience4j/resilience4j
@Test
public void shouldEmitErrorWithBulkheadFullException() {
bulkhead.isCallPermitted();
StepVerifier.create(
Flux.just("Event")
.transform(BulkheadOperator.of(bulkhead)))
.expectSubscription()
.expectError(BulkheadFullException.class)
.verify(Duration.ofSeconds(1));
assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(0);
}
代码示例来源:origin: resilience4j/resilience4j
@Test
public void shouldEmitEvent() {
StepVerifier.create(
Flux.just("Event 1", "Event 2")
.transform(BulkheadOperator.of(bulkhead)))
.expectNext("Event 1")
.expectNext("Event 2")
.verifyComplete();
assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(1);
}
代码示例来源:origin: resilience4j/resilience4j
@Test
public void shouldEmitRequestNotPermittedExceptionEvenWhenErrorDuringSubscribe() {
saturateRateLimiter();
StepVerifier.create(
Flux.error(new IOException("BAM!"))
.transform(RateLimiterOperator.of(rateLimiter)))
.expectSubscription()
.expectError(RequestNotPermitted.class)
.verify(Duration.ofSeconds(1));
assertNoPermitLeft();
}
代码示例来源:origin: resilience4j/resilience4j
@Test
public void shouldEmitRequestNotPermittedExceptionEvenWhenErrorNotOnSubscribe() {
saturateRateLimiter();
StepVerifier.create(
Flux.error(new IOException("BAM!"), true)
.transform(RateLimiterOperator.of(rateLimiter)))
.expectSubscription()
.expectError(RequestNotPermitted.class)
.verify(Duration.ofSeconds(1));
assertNoPermitLeft();
}
}
代码示例来源:origin: resilience4j/resilience4j
@Test
public void shouldEmitErrorWithCircuitBreakerOpenException() {
circuitBreaker.transitionToOpenState();
StepVerifier.create(
Flux.just("Event 1", "Event 2")
.transform(CircuitBreakerOperator.of(circuitBreaker)))
.expectError(CircuitBreakerOpenException.class)
.verify(Duration.ofSeconds(1));
assertNoRegisteredCall();
}
代码示例来源:origin: resilience4j/resilience4j
@Test
public void shouldEmitRequestNotPermittedException() {
saturateRateLimiter();
StepVerifier.create(
Flux.just("Event")
.transform(RateLimiterOperator.of(rateLimiter, Schedulers.immediate())))
.expectSubscription()
.expectError(RequestNotPermitted.class)
.verify(Duration.ofSeconds(1));
assertNoPermitLeft();
}
代码示例来源:origin: resilience4j/resilience4j
@Test
public void shouldEmitEvent() {
StepVerifier.create(
Flux.just("Event 1", "Event 2")
.transform(CircuitBreakerOperator.of(circuitBreaker)))
.expectNext("Event 1")
.expectNext("Event 2")
.verifyComplete();
assertSingleSuccessfulCall();
}
内容来源于网络,如有侵权,请联系作者删除!