reactor.core.publisher.Mono.repeatWhen()方法的使用及代码示例

x33g5p2x  于2022-01-24 转载在 其他  
字(6.7k)|赞(0)|评价(0)|浏览(311)

本文整理了Java中reactor.core.publisher.Mono.repeatWhen()方法的一些代码示例,展示了Mono.repeatWhen()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Mono.repeatWhen()方法的具体详情如下:
包路径:reactor.core.publisher.Mono
类名称:Mono
方法名:repeatWhen

Mono.repeatWhen介绍

[英]Repeatedly subscribe to this Mono when a companion sequence emits elements in response to the flux completion signal. Any terminal signal from the companion sequence will terminate the resulting Flux with the same signal immediately.

If the companion sequence signals when this Mono is active, the repeat attempt is suppressed.

Note that if the companion Publisher created by the repeatFactoryemits Context as trigger objects, the content of these Context will be added to the operator's own Context.
[中]当伴随序列响应通量完成信号发射元素时,重复订阅此单声道。来自伴随序列的任何终端信号将立即用相同的信号终止产生的通量。
如果伴奏序列在该单声道激活时发出信号,则重复尝试被抑制。
请注意,如果RepeatFactoryEmit创建的伴随发布者将上下文作为触发器对象,则这些上下文的内容将添加到操作员自己的上下文中。

代码示例

代码示例来源:origin: spring-projects/spring-framework

@Override
public ListenableFuture<Void> connect(TcpConnectionHandler<P> handler, ReconnectStrategy strategy) {
  Assert.notNull(handler, "TcpConnectionHandler is required");
  Assert.notNull(strategy, "ReconnectStrategy is required");
  if (this.stopping) {
    return handleShuttingDownConnectFailure(handler);
  }
  // Report first connect to the ListenableFuture
  MonoProcessor<Void> connectMono = MonoProcessor.create();
  this.tcpClient
      .handle(new ReactorNettyHandler(handler))
      .connect()
      .doOnNext(updateConnectMono(connectMono))
      .doOnError(updateConnectMono(connectMono))
      .doOnError(handler::afterConnectFailure)    // report all connect failures to the handler
      .flatMap(Connection::onDispose)             // post-connect issues
      .retryWhen(reconnectFunction(strategy))
      .repeatWhen(reconnectFunction(strategy))
      .subscribe();
  return new MonoToListenableFutureAdapter<>(connectMono);
}

代码示例来源:origin: reactor/reactor-core

return this.repeatWhen(o -> repeatFactory.apply(o
      .zipWith(iterations, 1, (c, i) -> i)))
      .next();

代码示例来源:origin: reactor/reactor-core

Flux<Integer> exponentialRepeatScenario1() {
  AtomicInteger i = new AtomicInteger();
  return Mono.fromCallable(i::incrementAndGet)
        .repeatWhen(repeat -> repeat.zipWith(Flux.range(1, 3), (t1, t2) -> t2)
                      .flatMap(time -> Mono.delay(Duration.ofSeconds(
                          time))));
}

代码示例来源:origin: reactor/reactor-core

Flux<String> exponentialRepeatScenario2() {
  AtomicInteger i = new AtomicInteger();
  return Mono.<String>create(s -> {
    if (i.incrementAndGet() == 4) {
      s.success("hey");
    }
    else {
      s.success();
    }
  }).repeatWhen(repeat -> repeat.zipWith(Flux.range(1, 3), (t1, t2) -> t2)
                 .flatMap(time -> Mono.delay(Duration.ofSeconds(time))));
}

代码示例来源:origin: spring-cloud/spring-cloud-gateway

public GatewayFilter apply(Repeat<ServerWebExchange> repeat, Retry<ServerWebExchange> retry) {
  return (exchange, chain) -> {
    trace("Entering retry-filter");
    // chain.filter returns a Mono<Void>
    Publisher<Void> publisher = chain.filter(exchange)
        //.log("retry-filter", Level.INFO)
        .doOnSuccessOrError((aVoid, throwable) -> {
          int iteration = exchange.getAttributeOrDefault(RETRY_ITERATION_KEY, -1);
          int newIteration = iteration + 1;
          trace("setting new iteration in attr %d", newIteration);
          exchange.getAttributes().put(RETRY_ITERATION_KEY, newIteration);
        });
    if (retry != null) {
      // retryWhen returns a Mono<Void>
      // retry needs to go before repeat
      publisher = ((Mono<Void>)publisher).retryWhen(retry.withApplicationContext(exchange));
    }
    if (repeat != null) {
      // repeatWhen returns a Flux<Void>
      // so this needs to be last and the variable a Publisher<Void>
      publisher = ((Mono<Void>)publisher).repeatWhen(repeat.withApplicationContext(exchange));
    }
    return Mono.fromDirect(publisher);
  };
}

代码示例来源:origin: org.springframework/spring-messaging

@Override
public ListenableFuture<Void> connect(TcpConnectionHandler<P> handler, ReconnectStrategy strategy) {
  Assert.notNull(handler, "TcpConnectionHandler is required");
  Assert.notNull(strategy, "ReconnectStrategy is required");
  if (this.stopping) {
    return handleShuttingDownConnectFailure(handler);
  }
  // Report first connect to the ListenableFuture
  MonoProcessor<Void> connectMono = MonoProcessor.create();
  this.tcpClient
      .handle(new ReactorNettyHandler(handler))
      .connect()
      .doOnNext(updateConnectMono(connectMono))
      .doOnError(updateConnectMono(connectMono))
      .doOnError(handler::afterConnectFailure)    // report all connect failures to the handler
      .flatMap(Connection::onDispose)             // post-connect issues
      .retryWhen(reconnectFunction(strategy))
      .repeatWhen(reconnectFunction(strategy))
      .subscribe();
  return new MonoToListenableFutureAdapter<>(connectMono);
}

代码示例来源:origin: com.aol.cyclops/cyclops-reactor

/**
 * @param whenFactory
 * @return
 * @see reactor.core.publisher.Mono#repeatWhen(java.util.function.Function)
 */
public final Flux<T> repeatWhen(Function<Flux<Long>, ? extends Publisher<?>> whenFactory) {
  return boxed.repeatWhen(whenFactory);
}
/**

代码示例来源:origin: io.projectreactor/reactor-core

return this.repeatWhen(o -> repeatFactory.apply(o
      .zipWith(iterations, 1, (c, i) -> i)))
      .next();

代码示例来源:origin: apache/servicemix-bundles

@Override
public ListenableFuture<Void> connect(TcpConnectionHandler<P> handler, ReconnectStrategy strategy) {
  Assert.notNull(handler, "TcpConnectionHandler is required");
  Assert.notNull(strategy, "ReconnectStrategy is required");
  if (this.stopping) {
    return handleShuttingDownConnectFailure(handler);
  }
  // Report first connect to the ListenableFuture
  MonoProcessor<Void> connectMono = MonoProcessor.create();
  this.tcpClient
      .handle(new ReactorNettyHandler(handler))
      .connect()
      .doOnNext(updateConnectMono(connectMono))
      .doOnError(updateConnectMono(connectMono))
      .doOnError(handler::afterConnectFailure)    // report all connect failures to the handler
      .flatMap(Connection::onDispose)             // post-connect issues
      .retryWhen(reconnectFunction(strategy))
      .repeatWhen(reconnectFunction(strategy))
      .subscribe();
  return new MonoToListenableFutureAdapter<>(connectMono);
}

代码示例来源:origin: org.apache.servicemix.bundles/org.apache.servicemix.bundles.spring-messaging

@Override
public ListenableFuture<Void> connect(TcpConnectionHandler<P> handler, ReconnectStrategy strategy) {
  Assert.notNull(handler, "TcpConnectionHandler is required");
  Assert.notNull(strategy, "ReconnectStrategy is required");
  if (this.stopping) {
    return handleShuttingDownConnectFailure(handler);
  }
  // Report first connect to the ListenableFuture
  MonoProcessor<Void> connectMono = MonoProcessor.create();
  this.tcpClient
      .handle(new ReactorNettyHandler(handler))
      .connect()
      .doOnNext(updateConnectMono(connectMono))
      .doOnError(updateConnectMono(connectMono))
      .doOnError(handler::afterConnectFailure)    // report all connect failures to the handler
      .flatMap(Connection::onDispose)             // post-connect issues
      .retryWhen(reconnectFunction(strategy))
      .repeatWhen(reconnectFunction(strategy))
      .subscribe();
  return new MonoToListenableFutureAdapter<>(connectMono);
}

相关文章

微信公众号

最新文章

更多

Mono类方法