reactor.core.publisher.Flux.cast()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(10.3k)|赞(0)|评价(0)|浏览(250)

本文整理了Java中reactor.core.publisher.Flux.cast()方法的一些代码示例,展示了Flux.cast()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.cast()方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:cast

Flux.cast介绍

[英]Cast the current Flux produced type into a target produced type.
[中]将电流通量产生类型转换为目标产生类型。

代码示例

代码示例来源:origin: reactor/reactor-core

/**
 * Evaluate each accepted value against the given {@link Class} type. If the
 * a value matches the type, it is passed into the resulting {@link Flux}. Otherwise
 * the value is ignored and a request of 1 is emitted.
 *
 * <p>
 * <img class="marble" src="doc-files/marbles/ofTypeForFlux.svg" alt="">
 *
 * @param clazz the {@link Class} type to test values against
 *
 * @return a new {@link Flux} filtered on items of the requested type
 */
public final <U> Flux<U> ofType(final Class<U> clazz) {
    Objects.requireNonNull(clazz, "clazz");
    return filter(o -> clazz.isAssignableFrom(o.getClass())).cast(clazz);
}

代码示例来源:origin: line/armeria

ArmeriaClientHttpResponse(com.linecorp.armeria.common.HttpHeaders headers,
             ResponseBodyPublisher publisher,
             DataBufferFactoryWrapper<?> factoryWrapper) {
  this.headers = requireNonNull(headers, "headers");
  final com.linecorp.armeria.common.HttpStatus status = headers.status();
  checkArgument(status != null, "no status in HTTP headers");
  this.status = status;
  body = Flux.from(publisher).cast(HttpData.class).map(factoryWrapper::toDataBuffer);
}

代码示例来源:origin: codecentric/spring-boot-admin

public void start() {
  subscription = Flux.from(publisher)
            .log(log.getName(), Level.FINEST)
            .doOnSubscribe(s -> log.debug("Subscribed to {} events", eventType))
            .ofType(eventType)
            .cast(eventType)
            .compose(this::handle)
            .onErrorContinue((ex, value) -> log.warn("Unexpected error while handling {}", value, ex))
            .subscribe();
}

代码示例来源:origin: line/armeria

ArmeriaServerHttpRequest(ServiceRequestContext ctx,
             HttpRequest req,
             DataBufferFactoryWrapper<?> factoryWrapper) {
  super(URI.create(requireNonNull(req, "req").path()),
     null,
     fromArmeriaHttpHeaders(req.headers()));
  this.ctx = requireNonNull(ctx, "ctx");
  this.req = req;
  body = Flux.from(req).cast(HttpData.class).map(factoryWrapper::toDataBuffer)
        // Guarantee that the context is accessible from a controller method
        // when a user specify @RequestBody in order to convert a request body into an object.
        .publishOn(Schedulers.fromExecutor(ctx.contextAwareExecutor()));
}

代码示例来源:origin: spring-projects/spring-framework

@Override
  public Mono<Void> handle(WebSocketSession session) {
    return session.send(Flux
        .error(new Throwable())
        .onErrorResume(ex -> session.close(CloseStatus.GOING_AWAY)) // SPR-17306 (nested close)
        .cast(WebSocketMessage.class));
  }
}

代码示例来源:origin: reactor/reactor-core

@Test(expected = NullPointerException.class)
public void sourceNull() {
  Flux.just(1)
    .cast(null);
}

代码示例来源:origin: spring-projects/spring-framework

@Test
public void readString() {
  MockServerHttpRequest request = MockServerHttpRequest.post("/")
      .body(Mono.just(stringBuffer("data:foo\ndata:bar\n\ndata:baz\n\n")));
  Flux<String> data = messageReader.read(ResolvableType.forClass(String.class),
      request, Collections.emptyMap()).cast(String.class);
  StepVerifier.create(data)
      .expectNextMatches(elem -> elem.equals("foo\nbar"))
      .expectNextMatches(elem -> elem.equals("baz"))
      .expectComplete()
      .verify();
}

代码示例来源:origin: spring-projects/spring-framework

@Test
public void readError() {
  Flux<DataBuffer> body =
      Flux.just(stringBuffer("data:foo\ndata:bar\n\ndata:baz\n\n"))
          .concatWith(Flux.error(new RuntimeException()));
  MockServerHttpRequest request = MockServerHttpRequest.post("/")
      .body(body);
  Flux<String> data = messageReader.read(ResolvableType.forClass(String.class),
      request, Collections.emptyMap()).cast(String.class);
  StepVerifier.create(data)
      .expectNextMatches(elem -> elem.equals("foo\nbar"))
      .expectNextMatches(elem -> elem.equals("baz"))
      .expectError()
      .verify();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void error() {
  StepVerifier.create(Flux.just(1)
              .cast(String.class))
        .verifyError(ClassCastException.class);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void normal() {
  StepVerifier.create(Flux.just(1)
              .cast(Number.class))
        .expectNext(1)
        .verifyComplete();
}

代码示例来源:origin: line/armeria

Flux.from(httpRequest).cast(HttpData.class).map(HttpData::toStringUtf8);
StepVerifier.create(requestBody, 1)
      .expectNext("a").thenRequest(1)

代码示例来源:origin: spring-projects/spring-framework

@Test
public void readPojo() {
  MockServerHttpRequest request = MockServerHttpRequest.post("/")
      .body(Mono.just(stringBuffer(
          "data:{\"foo\": \"foofoo\", \"bar\": \"barbar\"}\n\n" +
              "data:{\"foo\": \"foofoofoo\", \"bar\": \"barbarbar\"}\n\n")));
  Flux<Pojo> data = messageReader.read(ResolvableType.forClass(Pojo.class), request,
      Collections.emptyMap()).cast(Pojo.class);
  StepVerifier.create(data)
      .consumeNextWith(pojo -> {
        assertEquals("foofoo", pojo.getFoo());
        assertEquals("barbar", pojo.getBar());
      })
      .consumeNextWith(pojo -> {
        assertEquals("foofoofoo", pojo.getFoo());
        assertEquals("barbarbar", pojo.getBar());
      })
      .expectComplete()
      .verify();
}

代码示例来源:origin: spring-projects/spring-framework

@Test
public void readServerSentEvents() {
  MockServerHttpRequest request = MockServerHttpRequest.post("/")
      .body(Mono.just(stringBuffer(
          "id:c42\nevent:foo\nretry:123\n:bla\n:bla bla\n:bla bla bla\ndata:bar\n\n" +
          "id:c43\nevent:bar\nretry:456\ndata:baz\n\n")));
  Flux<ServerSentEvent> events = this.messageReader
      .read(ResolvableType.forClassWithGenerics(ServerSentEvent.class, String.class),
          request, Collections.emptyMap()).cast(ServerSentEvent.class);
  StepVerifier.create(events)
      .consumeNextWith(event -> {
        assertEquals("c42", event.id());
        assertEquals("foo", event.event());
        assertEquals(Duration.ofMillis(123), event.retry());
        assertEquals("bla\nbla bla\nbla bla bla", event.comment());
        assertEquals("bar", event.data());
      })
      .consumeNextWith(event -> {
        assertEquals("c43", event.id());
        assertEquals("bar", event.event());
        assertEquals(Duration.ofMillis(456), event.retry());
        assertNull(event.comment());
        assertEquals("baz", event.data());
      })
      .expectComplete()
      .verify();
}

代码示例来源:origin: line/armeria

Flux.from(httpRequest).cast(HttpData.class).map(HttpData::toStringUtf8);
StepVerifier.create(requestBody, 1)
      .expectNext("a").thenRequest(1)

代码示例来源:origin: spring-projects/spring-framework

@Test
public void readServerSentEventsWithMultipleChunks() {
  MockServerHttpRequest request = MockServerHttpRequest.post("/")
      .body(Flux.just(
          stringBuffer("id:c42\nev"),
          stringBuffer("ent:foo\nretry:123\n:bla\n:bla bla\n:bla bla bla\ndata:"),
          stringBuffer("bar\n\nid:c43\nevent:bar\nretry:456\ndata:baz\n\n")));
  Flux<ServerSentEvent> events = messageReader
      .read(ResolvableType.forClassWithGenerics(ServerSentEvent.class, String.class),
          request, Collections.emptyMap()).cast(ServerSentEvent.class);
  StepVerifier.create(events)
      .consumeNextWith(event -> {
        assertEquals("c42", event.id());
        assertEquals("foo", event.event());
        assertEquals(Duration.ofMillis(123), event.retry());
        assertEquals("bla\nbla bla\nbla bla bla", event.comment());
        assertEquals("bar", event.data());
      })
      .consumeNextWith(event -> {
        assertEquals("c43", event.id());
        assertEquals("bar", event.event());
        assertEquals(Duration.ofMillis(456), event.retry());
        assertNull(event.comment());
        assertEquals("baz", event.data());
      })
      .expectComplete()
      .verify();
}

代码示例来源:origin: mulesoft/mule

/**
 * Processes the request phase before the next message processor is invoked.
 *
 * @return function that performs request processing
 */
protected ReactiveProcessor processRequest() {
 return stream -> from(stream).cast(PrivilegedEvent.class).map(event -> {
  try {
   setCurrentEvent(event);
   return processRequest(event);
  } catch (MuleException e) {
   throw propagate(e);
  }
 });
}

代码示例来源:origin: spring-projects/spring-integration

@Test
public void testReactiveInboundChannelAdapter() {
  Flux<Integer> testFlux =
      Flux.from(this.fluxMessageChannel)
          .map(Message::getPayload)
          .cast(Integer.class);
  StepVerifier.create(testFlux)
      .expectNext(2, 4, 6, 8, 10, 12, 14, 16)
      .thenCancel()
      .verify();
}

代码示例来源:origin: spring-projects/spring-integration

@Test
public void splitStreamReactive() {
  Message<?> message = new GenericMessage<>(Stream.of("x", "y", "z"));
  FluxMessageChannel replyChannel = new FluxMessageChannel();
  DefaultMessageSplitter splitter = new DefaultMessageSplitter();
  splitter.setOutputChannel(replyChannel);
  splitter.handleMessage(message);
  Flux<String> testFlux =
      Flux.from(replyChannel)
          .map(Message::getPayload)
          .cast(String.class);
  StepVerifier.create(testFlux)
      .expectNext("x", "y", "z")
      .then(() ->
          ((Subscriber<?>) TestUtils.getPropertyValue(replyChannel, "subscribers", List.class).get(0))
              .onComplete())
      .verifyComplete();
}

代码示例来源:origin: spring-projects/spring-integration

@Test
public void splitArrayPayloadReactive() {
  Message<?> message = new GenericMessage<>(new String[] { "x", "y", "z" });
  FluxMessageChannel replyChannel = new FluxMessageChannel();
  DefaultMessageSplitter splitter = new DefaultMessageSplitter();
  splitter.setOutputChannel(replyChannel);
  splitter.handleMessage(message);
  Flux<String> testFlux =
      Flux.from(replyChannel)
          .map(Message::getPayload)
          .cast(String.class);
  StepVerifier.create(testFlux)
      .expectNext("x", "y", "z")
      .then(() ->
          ((Subscriber<?>) TestUtils.getPropertyValue(replyChannel, "subscribers", List.class).get(0))
              .onComplete())
      .verifyComplete();
}

代码示例来源:origin: spring-projects/spring-integration

@Test
public void splitFluxReactive() {
  Message<?> message = new GenericMessage<>(Flux.just("x", "y", "z"));
  FluxMessageChannel replyChannel = new FluxMessageChannel();
  DefaultMessageSplitter splitter = new DefaultMessageSplitter();
  splitter.setOutputChannel(replyChannel);
  splitter.handleMessage(message);
  Flux<String> testFlux =
      Flux.from(replyChannel)
          .map(Message::getPayload)
          .cast(String.class);
  StepVerifier.create(testFlux)
      .expectNext("x", "y", "z")
      .then(() ->
          ((Subscriber<?>) TestUtils.getPropertyValue(replyChannel, "subscribers", List.class).get(0))
              .onComplete())
      .verifyComplete();
}

相关文章

微信公众号

最新文章

更多

Flux类方法