本文整理了Java中reactor.core.publisher.Flux.cast()
方法的一些代码示例,展示了Flux.cast()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.cast()
方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:cast
[英]Cast the current Flux produced type into a target produced type.
[中]将电流通量产生类型转换为目标产生类型。
代码示例来源:origin: reactor/reactor-core
/**
* Evaluate each accepted value against the given {@link Class} type. If the
* a value matches the type, it is passed into the resulting {@link Flux}. Otherwise
* the value is ignored and a request of 1 is emitted.
*
* <p>
* <img class="marble" src="doc-files/marbles/ofTypeForFlux.svg" alt="">
*
* @param clazz the {@link Class} type to test values against
*
* @return a new {@link Flux} filtered on items of the requested type
*/
public final <U> Flux<U> ofType(final Class<U> clazz) {
Objects.requireNonNull(clazz, "clazz");
return filter(o -> clazz.isAssignableFrom(o.getClass())).cast(clazz);
}
代码示例来源:origin: line/armeria
ArmeriaClientHttpResponse(com.linecorp.armeria.common.HttpHeaders headers,
ResponseBodyPublisher publisher,
DataBufferFactoryWrapper<?> factoryWrapper) {
this.headers = requireNonNull(headers, "headers");
final com.linecorp.armeria.common.HttpStatus status = headers.status();
checkArgument(status != null, "no status in HTTP headers");
this.status = status;
body = Flux.from(publisher).cast(HttpData.class).map(factoryWrapper::toDataBuffer);
}
代码示例来源:origin: codecentric/spring-boot-admin
public void start() {
subscription = Flux.from(publisher)
.log(log.getName(), Level.FINEST)
.doOnSubscribe(s -> log.debug("Subscribed to {} events", eventType))
.ofType(eventType)
.cast(eventType)
.compose(this::handle)
.onErrorContinue((ex, value) -> log.warn("Unexpected error while handling {}", value, ex))
.subscribe();
}
代码示例来源:origin: line/armeria
ArmeriaServerHttpRequest(ServiceRequestContext ctx,
HttpRequest req,
DataBufferFactoryWrapper<?> factoryWrapper) {
super(URI.create(requireNonNull(req, "req").path()),
null,
fromArmeriaHttpHeaders(req.headers()));
this.ctx = requireNonNull(ctx, "ctx");
this.req = req;
body = Flux.from(req).cast(HttpData.class).map(factoryWrapper::toDataBuffer)
// Guarantee that the context is accessible from a controller method
// when a user specify @RequestBody in order to convert a request body into an object.
.publishOn(Schedulers.fromExecutor(ctx.contextAwareExecutor()));
}
代码示例来源:origin: spring-projects/spring-framework
@Override
public Mono<Void> handle(WebSocketSession session) {
return session.send(Flux
.error(new Throwable())
.onErrorResume(ex -> session.close(CloseStatus.GOING_AWAY)) // SPR-17306 (nested close)
.cast(WebSocketMessage.class));
}
}
代码示例来源:origin: reactor/reactor-core
@Test(expected = NullPointerException.class)
public void sourceNull() {
Flux.just(1)
.cast(null);
}
代码示例来源:origin: spring-projects/spring-framework
@Test
public void readString() {
MockServerHttpRequest request = MockServerHttpRequest.post("/")
.body(Mono.just(stringBuffer("data:foo\ndata:bar\n\ndata:baz\n\n")));
Flux<String> data = messageReader.read(ResolvableType.forClass(String.class),
request, Collections.emptyMap()).cast(String.class);
StepVerifier.create(data)
.expectNextMatches(elem -> elem.equals("foo\nbar"))
.expectNextMatches(elem -> elem.equals("baz"))
.expectComplete()
.verify();
}
代码示例来源:origin: spring-projects/spring-framework
@Test
public void readError() {
Flux<DataBuffer> body =
Flux.just(stringBuffer("data:foo\ndata:bar\n\ndata:baz\n\n"))
.concatWith(Flux.error(new RuntimeException()));
MockServerHttpRequest request = MockServerHttpRequest.post("/")
.body(body);
Flux<String> data = messageReader.read(ResolvableType.forClass(String.class),
request, Collections.emptyMap()).cast(String.class);
StepVerifier.create(data)
.expectNextMatches(elem -> elem.equals("foo\nbar"))
.expectNextMatches(elem -> elem.equals("baz"))
.expectError()
.verify();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void error() {
StepVerifier.create(Flux.just(1)
.cast(String.class))
.verifyError(ClassCastException.class);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void normal() {
StepVerifier.create(Flux.just(1)
.cast(Number.class))
.expectNext(1)
.verifyComplete();
}
代码示例来源:origin: line/armeria
Flux.from(httpRequest).cast(HttpData.class).map(HttpData::toStringUtf8);
StepVerifier.create(requestBody, 1)
.expectNext("a").thenRequest(1)
代码示例来源:origin: spring-projects/spring-framework
@Test
public void readPojo() {
MockServerHttpRequest request = MockServerHttpRequest.post("/")
.body(Mono.just(stringBuffer(
"data:{\"foo\": \"foofoo\", \"bar\": \"barbar\"}\n\n" +
"data:{\"foo\": \"foofoofoo\", \"bar\": \"barbarbar\"}\n\n")));
Flux<Pojo> data = messageReader.read(ResolvableType.forClass(Pojo.class), request,
Collections.emptyMap()).cast(Pojo.class);
StepVerifier.create(data)
.consumeNextWith(pojo -> {
assertEquals("foofoo", pojo.getFoo());
assertEquals("barbar", pojo.getBar());
})
.consumeNextWith(pojo -> {
assertEquals("foofoofoo", pojo.getFoo());
assertEquals("barbarbar", pojo.getBar());
})
.expectComplete()
.verify();
}
代码示例来源:origin: spring-projects/spring-framework
@Test
public void readServerSentEvents() {
MockServerHttpRequest request = MockServerHttpRequest.post("/")
.body(Mono.just(stringBuffer(
"id:c42\nevent:foo\nretry:123\n:bla\n:bla bla\n:bla bla bla\ndata:bar\n\n" +
"id:c43\nevent:bar\nretry:456\ndata:baz\n\n")));
Flux<ServerSentEvent> events = this.messageReader
.read(ResolvableType.forClassWithGenerics(ServerSentEvent.class, String.class),
request, Collections.emptyMap()).cast(ServerSentEvent.class);
StepVerifier.create(events)
.consumeNextWith(event -> {
assertEquals("c42", event.id());
assertEquals("foo", event.event());
assertEquals(Duration.ofMillis(123), event.retry());
assertEquals("bla\nbla bla\nbla bla bla", event.comment());
assertEquals("bar", event.data());
})
.consumeNextWith(event -> {
assertEquals("c43", event.id());
assertEquals("bar", event.event());
assertEquals(Duration.ofMillis(456), event.retry());
assertNull(event.comment());
assertEquals("baz", event.data());
})
.expectComplete()
.verify();
}
代码示例来源:origin: line/armeria
Flux.from(httpRequest).cast(HttpData.class).map(HttpData::toStringUtf8);
StepVerifier.create(requestBody, 1)
.expectNext("a").thenRequest(1)
代码示例来源:origin: spring-projects/spring-framework
@Test
public void readServerSentEventsWithMultipleChunks() {
MockServerHttpRequest request = MockServerHttpRequest.post("/")
.body(Flux.just(
stringBuffer("id:c42\nev"),
stringBuffer("ent:foo\nretry:123\n:bla\n:bla bla\n:bla bla bla\ndata:"),
stringBuffer("bar\n\nid:c43\nevent:bar\nretry:456\ndata:baz\n\n")));
Flux<ServerSentEvent> events = messageReader
.read(ResolvableType.forClassWithGenerics(ServerSentEvent.class, String.class),
request, Collections.emptyMap()).cast(ServerSentEvent.class);
StepVerifier.create(events)
.consumeNextWith(event -> {
assertEquals("c42", event.id());
assertEquals("foo", event.event());
assertEquals(Duration.ofMillis(123), event.retry());
assertEquals("bla\nbla bla\nbla bla bla", event.comment());
assertEquals("bar", event.data());
})
.consumeNextWith(event -> {
assertEquals("c43", event.id());
assertEquals("bar", event.event());
assertEquals(Duration.ofMillis(456), event.retry());
assertNull(event.comment());
assertEquals("baz", event.data());
})
.expectComplete()
.verify();
}
代码示例来源:origin: mulesoft/mule
/**
* Processes the request phase before the next message processor is invoked.
*
* @return function that performs request processing
*/
protected ReactiveProcessor processRequest() {
return stream -> from(stream).cast(PrivilegedEvent.class).map(event -> {
try {
setCurrentEvent(event);
return processRequest(event);
} catch (MuleException e) {
throw propagate(e);
}
});
}
代码示例来源:origin: spring-projects/spring-integration
@Test
public void testReactiveInboundChannelAdapter() {
Flux<Integer> testFlux =
Flux.from(this.fluxMessageChannel)
.map(Message::getPayload)
.cast(Integer.class);
StepVerifier.create(testFlux)
.expectNext(2, 4, 6, 8, 10, 12, 14, 16)
.thenCancel()
.verify();
}
代码示例来源:origin: spring-projects/spring-integration
@Test
public void splitStreamReactive() {
Message<?> message = new GenericMessage<>(Stream.of("x", "y", "z"));
FluxMessageChannel replyChannel = new FluxMessageChannel();
DefaultMessageSplitter splitter = new DefaultMessageSplitter();
splitter.setOutputChannel(replyChannel);
splitter.handleMessage(message);
Flux<String> testFlux =
Flux.from(replyChannel)
.map(Message::getPayload)
.cast(String.class);
StepVerifier.create(testFlux)
.expectNext("x", "y", "z")
.then(() ->
((Subscriber<?>) TestUtils.getPropertyValue(replyChannel, "subscribers", List.class).get(0))
.onComplete())
.verifyComplete();
}
代码示例来源:origin: spring-projects/spring-integration
@Test
public void splitArrayPayloadReactive() {
Message<?> message = new GenericMessage<>(new String[] { "x", "y", "z" });
FluxMessageChannel replyChannel = new FluxMessageChannel();
DefaultMessageSplitter splitter = new DefaultMessageSplitter();
splitter.setOutputChannel(replyChannel);
splitter.handleMessage(message);
Flux<String> testFlux =
Flux.from(replyChannel)
.map(Message::getPayload)
.cast(String.class);
StepVerifier.create(testFlux)
.expectNext("x", "y", "z")
.then(() ->
((Subscriber<?>) TestUtils.getPropertyValue(replyChannel, "subscribers", List.class).get(0))
.onComplete())
.verifyComplete();
}
代码示例来源:origin: spring-projects/spring-integration
@Test
public void splitFluxReactive() {
Message<?> message = new GenericMessage<>(Flux.just("x", "y", "z"));
FluxMessageChannel replyChannel = new FluxMessageChannel();
DefaultMessageSplitter splitter = new DefaultMessageSplitter();
splitter.setOutputChannel(replyChannel);
splitter.handleMessage(message);
Flux<String> testFlux =
Flux.from(replyChannel)
.map(Message::getPayload)
.cast(String.class);
StepVerifier.create(testFlux)
.expectNext("x", "y", "z")
.then(() ->
((Subscriber<?>) TestUtils.getPropertyValue(replyChannel, "subscribers", List.class).get(0))
.onComplete())
.verifyComplete();
}
内容来源于网络,如有侵权,请联系作者删除!