reactor.core.publisher.Mono类的使用及代码示例

x33g5p2x  于2022-01-24 转载在 其他  
字(12.0k)|赞(0)|评价(0)|浏览(1531)

本文整理了Java中reactor.core.publisher.Mono类的一些代码示例,展示了Mono类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Mono类的具体详情如下:
包路径:reactor.core.publisher.Mono
类名称:Mono

Mono介绍

[英]A Reactive Streams Publisher with basic rx operators that completes successfully by emitting an element, or with an error.

The recommended way to learn about the Mono API and discover new operators is through the reference documentation, rather than through this javadoc (as opposed to learning more about individual operators). See the "which operator do I need?" appendix.

The rx operators will offer aliases for input Mono type to preserve the "at most one" property of the resulting Mono. For instance Mono#flatMap returns a Mono, while there is a Mono#flatMapMany alias with possibly more than 1 emission.

Mono should be used for Publisher that just completes without any value.

It is intended to be used in implementations and return types, input parameters should keep using raw Publisher as much as possible.

Note that using state in the java.util.function / lambdas used within Mono operators should be avoided, as these may be shared between several Subscriber.
[中]带有基本rx操作员的反应流发布器,通过发出一个元素或错误成功完成。
学习MonoAPI和发现新操作符的推荐方法是通过参考文档,而不是通过这个javadoc(而不是学习更多关于单个操作符的内容)。请参阅"which operator do I need?" appendix
rx操作员将为输入单声道类型提供别名,以保留结果单声道的“最多一个”属性。例如,Mono#flatMap返回一个Mono,而Mono#flatMap的许多别名可能包含多个发射。
Mono应该用于刚刚完成但没有任何价值的Publisher。
它用于实现和返回类型,输入参数应尽可能使用原始发布器。
请注意,在java中使用状态。util。应避免在Mono运营商中使用function/Lambda,因为这些可能在多个订户之间共享。

代码示例

代码示例来源:origin: spring-projects/spring-data-redis

/**
 * Get the size of sorted set with {@literal key}.
 *
 * @param key must not be {@literal null}.
 * @return
 * @see <a href="http://redis.io/commands/zcard">Redis Documentation: ZCARD</a>
 */
default Mono<Long> zCard(ByteBuffer key) {
  Assert.notNull(key, "Key must not be null!");
  return zCard(Mono.just(new KeyCommand(key))).next().map(NumericResponse::getOutput);
}

代码示例来源:origin: spring-projects/spring-framework

@Override
  public ResponseSpec exchange() {
    ClientResponse clientResponse = this.bodySpec.exchange().block(getTimeout());
    Assert.state(clientResponse != null, "No ClientResponse");
    WiretapConnector.Info info = wiretapConnector.claimRequest(this.requestId);
    return new DefaultResponseSpec(info, clientResponse, this.uriTemplate, getTimeout());
  }
}

代码示例来源:origin: spring-projects/spring-framework

/**
 * Adapt the given request processor function to a filter function that only
 * operates on the {@code ClientRequest}.
 * @param processor the request processor
 * @return the resulting filter adapter
 */
static ExchangeFilterFunction ofRequestProcessor(Function<ClientRequest, Mono<ClientRequest>> processor) {
  Assert.notNull(processor, "ClientRequest Function must not be null");
  return (request, next) -> processor.apply(request).flatMap(next::exchange);
}

代码示例来源:origin: spring-projects/spring-framework

private <R> Mono<R> createNotFoundError() {
  return Mono.defer(() -> {
    Exception ex = new ResponseStatusException(HttpStatus.NOT_FOUND, "No matching handler");
    return Mono.error(ex);
  });
}

代码示例来源:origin: spring-projects/spring-framework

@Override
public Mono<HandlerResult> handle(ServerWebExchange exchange, Object handler) {
  WebHandler webHandler = (WebHandler) handler;
  Mono<Void> mono = webHandler.handle(exchange);
  return mono.then(Mono.empty());
}

代码示例来源:origin: spring-projects/spring-framework

@Override
public Mono<HandlerFunction<ServerResponse>> route(ServerRequest request) {
  return this.first.route(request)
      .map(RouterFunctions::cast)
      .switchIfEmpty(Mono.defer(() -> this.second.route(request).map(RouterFunctions::cast)));
}

代码示例来源:origin: spring-projects/spring-data-redis

@Override
public Flux<BooleanResponse<KeyCommand>> exists(Publisher<KeyCommand> commands) {
  return connection.execute(cmd -> Flux.from(commands).concatMap((command) -> {
    Assert.notNull(command.getKey(), "Key must not be null!");
    return cmd.exists(command.getKey()).map(LettuceConverters.longToBooleanConverter()::convert)
        .map((value) -> new BooleanResponse<>(command, value));
  }));
}

代码示例来源:origin: spring-projects/spring-data-redis

/**
 * Get elements in {@literal range} from sorted set in reverse {@literal score} ordering.
 *
 * @param key must not be {@literal null}.
 * @param range must not be {@literal null}.
 * @return
 * @see <a href="http://redis.io/commands/zrevrange">Redis Documentation: ZREVRANGE</a>
 */
default Flux<ByteBuffer> zRevRange(ByteBuffer key, Range<Long> range) {
  Assert.notNull(key, "Key must not be null!");
  return zRange(Mono.just(ZRangeCommand.reverseValuesWithin(range).from(key))).flatMap(CommandResponse::getOutput)
      .map(tuple -> ByteBuffer.wrap(tuple.getValue()));
}

代码示例来源:origin: spring-projects/spring-data-redis

@Override
public Mono<Long> unionAndStore(K key, Collection<K> otherKeys, K destKey, Aggregate aggregate, Weights weights) {
  Assert.notNull(key, "Key must not be null!");
  Assert.notNull(otherKeys, "Other keys must not be null!");
  Assert.notNull(destKey, "Destination key must not be null!");
  Assert.notNull(aggregate, "Aggregate must not be null!");
  Assert.notNull(weights, "Weights must not be null!");
  return createMono(connection -> Flux.fromIterable(getKeys(key, otherKeys)) //
      .map(this::rawKey) //
      .collectList() //
      .flatMap(serialized -> connection.zUnionStore(rawKey(destKey), serialized, weights, aggregate)));
}

代码示例来源:origin: spring-projects/spring-data-redis

@Override
public Mono<Long> intersectAndStore(K key, Collection<K> otherKeys, K destKey, Aggregate aggregate, Weights weights) {
  Assert.notNull(key, "Key must not be null!");
  Assert.notNull(otherKeys, "Other keys must not be null!");
  Assert.notNull(destKey, "Destination key must not be null!");
  Assert.notNull(aggregate, "Aggregate must not be null!");
  Assert.notNull(weights, "Weights must not be null!");
  return createMono(connection -> Flux.fromIterable(getKeys(key, otherKeys)) //
      .map(this::rawKey) //
      .collectList() //
      .flatMap(serialized -> connection.zInterStore(rawKey(destKey), serialized, weights, aggregate)));
}

代码示例来源:origin: spring-projects/spring-framework

@Override
public Mono<ServerResponse> syncBody(Object body) {
  Assert.notNull(body, "Body must not be null");
  Assert.isTrue(!(body instanceof Publisher),
      "Please specify the element class by using body(Publisher, Class)");
  return new DefaultEntityResponseBuilder<>(body,
      BodyInserters.fromObject(body))
      .headers(this.headers)
      .status(this.statusCode)
      .build()
      .map(entityResponse -> entityResponse);
}

代码示例来源:origin: spring-projects/spring-data-redis

@Override
@SafeVarargs
public final Mono<Long> remove(K key, V... members) {
  Assert.notNull(key, "Key must not be null!");
  Assert.notEmpty(members, "Members must not be null or empty!");
  Assert.noNullElements(members, "Members must not contain null elements!");
  return template.createMono(connection -> Flux.fromArray(members) //
      .map(this::rawValue) //
      .collectList() //
      .flatMap(serialized -> connection.zSetCommands().zRem(rawKey(key), serialized)));
}

代码示例来源:origin: spring-projects/spring-framework

ResolvableType elementType, @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
Assert.notNull(inputStream, "'inputStream' must not be null");
Assert.notNull(bufferFactory, "'bufferFactory' must not be null");
Assert.notNull(elementType, "'elementType' must not be null");
  return Mono.from(inputStream).map(value ->
      encodeValue(value, mimeType, bufferFactory, elementType, hints, encoding)).flux();
        byte[] separator =
            STREAM_SEPARATORS.getOrDefault(mediaType, NEWLINE_SEPARATOR);
        return Flux.from(inputStream).map(value -> {
          DataBuffer buffer =
              encodeValue(value, mimeType, bufferFactory, elementType, hints,
        ResolvableType listType =
            ResolvableType.forClassWithGenerics(List.class, elementType);
        return Flux.from(inputStream).collectList().map(list ->
            encodeValue(list, mimeType, bufferFactory, listType, hints,
                encoding)).flux();
      });

代码示例来源:origin: spring-projects/spring-framework

/**
 * Return a new {@code DataBuffer} composed from joining together the given
 * {@code dataBuffers} elements. Depending on the {@link DataBuffer} type,
 * the returned buffer may be a single buffer containing all data of the
 * provided buffers, or it may be a zero-copy, composite with references to
 * the given buffers.
 * <p>If {@code dataBuffers} produces an error or if there is a cancel
 * signal, then all accumulated buffers will be
 * {@linkplain #release(DataBuffer) released}.
 * <p>Note that the given data buffers do <strong>not</strong> have to be
 * released. They will be released as part of the returned composite.
 * @param dataBuffers the data buffers that are to be composed
 * @return a buffer that is composed from the {@code dataBuffers} argument
 * @since 5.0.3
 */
public static Mono<DataBuffer> join(Publisher<DataBuffer> dataBuffers) {
  Assert.notNull(dataBuffers, "'dataBuffers' must not be null");
  return Flux.from(dataBuffers)
      .collectList()
      .filter(list -> !list.isEmpty())
      .map(list -> list.get(0).factory().join(list))
      .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
}

代码示例来源:origin: spring-projects/spring-framework

@Override
public Mono<ServerResponse> build(Publisher<Void> voidPublisher) {
  Assert.notNull(voidPublisher, "Publisher must not be null");
  return build((exchange, handlerStrategies) ->
      Mono.from(voidPublisher).then(exchange.getResponse().setComplete()));
}

代码示例来源:origin: spring-projects/spring-framework

@Override
public ListenableFuture<Void> connect(TcpConnectionHandler<P> handler, ReconnectStrategy strategy) {
  Assert.notNull(handler, "TcpConnectionHandler is required");
  Assert.notNull(strategy, "ReconnectStrategy is required");
  if (this.stopping) {
    return handleShuttingDownConnectFailure(handler);
  }
  // Report first connect to the ListenableFuture
  MonoProcessor<Void> connectMono = MonoProcessor.create();
  this.tcpClient
      .handle(new ReactorNettyHandler(handler))
      .connect()
      .doOnNext(updateConnectMono(connectMono))
      .doOnError(updateConnectMono(connectMono))
      .doOnError(handler::afterConnectFailure)    // report all connect failures to the handler
      .flatMap(Connection::onDispose)             // post-connect issues
      .retryWhen(reconnectFunction(strategy))
      .repeatWhen(reconnectFunction(strategy))
      .subscribe();
  return new MonoToListenableFutureAdapter<>(connectMono);
}

代码示例来源:origin: spring-projects/spring-framework

private Mono<DataBuffer> encodeText(CharSequence text, MediaType mediaType, DataBufferFactory bufferFactory) {
  Assert.notNull(mediaType.getCharset(), "Expected MediaType with charset");
  byte[] bytes = text.toString().getBytes(mediaType.getCharset());
  return Mono.defer(() ->
      Mono.just(bufferFactory.allocateBuffer(bytes.length).write(bytes)));
}

代码示例来源:origin: spring-projects/spring-data-redis

@Override
public Mono<Long> addAll(K key, Collection<? extends TypedTuple<V>> tuples) {
  Assert.notNull(key, "Key must not be null!");
  Assert.notNull(tuples, "Key must not be null!");
  return createMono(connection -> Flux.fromIterable(tuples) //
      .map(t -> new DefaultTuple(ByteUtils.getBytes(rawValue(t.getValue())), t.getScore())) //
      .collectList() //
      .flatMap(serialized -> connection.zAdd(rawKey(key), serialized)));
}

代码示例来源:origin: spring-projects/spring-data-redis

@Override
public Flux<CommandResponse<KeyCommand, Flux<ByteBuffer>>> hVals(Publisher<KeyCommand> commands) {
  return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
    Assert.notNull(command.getKey(), "Key must not be null!");
    Flux<ByteBuffer> result = cmd.hvals(command.getKey());
    return Mono.just(new CommandResponse<>(command, result));
  }));
}

代码示例来源:origin: spring-projects/spring-framework

private Flux<Object> decodeInternal(Flux<TokenBuffer> tokens, ResolvableType elementType,
    @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
  Assert.notNull(tokens, "'tokens' must not be null");
  Assert.notNull(elementType, "'elementType' must not be null");
      getObjectMapper().readerFor(javaType));
  return tokens.flatMap(tokenBuffer -> {
    try {
      Object value = reader.readValue(tokenBuffer.asParser(getObjectMapper()));
        });
      return Mono.justOrEmpty(value);
      return Mono.error(new CodecException("Type definition error: " + ex.getType(), ex));
      return Mono.error(new DecodingException("JSON decoding error: " + ex.getOriginalMessage(), ex));
      return Mono.error(new DecodingException("I/O error while parsing input stream", ex));

相关文章

微信公众号

最新文章

更多

Mono类方法