io.reactivex.Single.retryWhen()方法的使用及代码示例

x33g5p2x  于2022-01-29 转载在 其他  
字(4.7k)|赞(0)|评价(0)|浏览(148)

本文整理了Java中io.reactivex.Single.retryWhen()方法的一些代码示例,展示了Single.retryWhen()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Single.retryWhen()方法的具体详情如下:
包路径:io.reactivex.Single
类名称:Single
方法名:retryWhen

Single.retryWhen介绍

[英]Re-subscribes to the current Single if and when the Publisher returned by the handler function signals a value.

If the Publisher signals an onComplete, the resulting Single will signal a NoSuchElementException.

Note that the inner Publisher returned by the handler function should signal either onNext, onError or onComplete in response to the received Throwable to indicate the operator should retry or terminate. If the upstream to the operator is asynchronous, signalling onNext followed by onComplete immediately may result in the sequence to be completed immediately. Similarly, if this inner Publisher signals onError or onComplete while the upstream is active, the sequence is terminated with the same signal immediately.

The following example demonstrates how to retry an asynchronous source with a delay:

Single.timer(1, TimeUnit.SECONDS) 
.doOnSubscribe(s -> System.out.println("subscribing")) 
.map(v -> { throw new RuntimeException(); }) 
.retryWhen(errors -> { 
AtomicInteger counter = new AtomicInteger(); 
return errors 
.takeWhile(e -> counter.getAndIncrement() != 3) 
.flatMap(e -> { 
System.out.println("delay retry by " + counter.get() + " second(s)"); 
return Flowable.timer(counter.get(), TimeUnit.SECONDS); 
}); 
}) 
.blockingGet();

Scheduler: retryWhen does not operate by default on a particular Scheduler.
[中]如果处理器函数返回的发布服务器发出一个值的信号,则重新订阅当前的单个。
如果发布者发出onComplete的信号,则生成的单曲将发出NosTouchElementException的信号。
请注意,处理程序函数返回的内部发布程序应发出onNext、onError或onComplete信号,以响应接收到的Throwable,指示操作员应重试或终止。如果操作员的上游是异步的,则发出onNext后紧接着onComplete的信号可能会导致序列立即完成。类似地,如果在上游处于活动状态时,此内部发布服务器发出onError或onComplete信号,则序列将立即以相同的信号终止。
以下示例演示如何延迟重试异步源:

Single.timer(1, TimeUnit.SECONDS) 
.doOnSubscribe(s -> System.out.println("subscribing")) 
.map(v -> { throw new RuntimeException(); }) 
.retryWhen(errors -> { 
AtomicInteger counter = new AtomicInteger(); 
return errors 
.takeWhile(e -> counter.getAndIncrement() != 3) 
.flatMap(e -> { 
System.out.println("delay retry by " + counter.get() + " second(s)"); 
return Flowable.timer(counter.get(), TimeUnit.SECONDS); 
}); 
}) 
.blockingGet();

调度程序:默认情况下,retryWhen不会在特定调度程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void retryWhenNull() {
  error.retryWhen(null);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void retryWhenFunctionReturnsNull() {
  error.retryWhen(new Function<Flowable<? extends Throwable>, Publisher<Object>>() {
    @Override
    public Publisher<Object> apply(Flowable<? extends Throwable> e) {
      return null;
    }
  }).blockingGet();
}

代码示例来源:origin: com.salesforce.servicelibs/rxgrpc-stub

@Override
  public Single<O> apply(final Flowable<I> request) {
    return Single.defer(new Callable<SingleSource<O>>() {
      @Override
      public SingleSource<O> call() throws Exception {
        return operation.apply(request);
      }
    }).retryWhen(handler);
  }
};

代码示例来源:origin: io.gravitee.repository/gravitee-repository-elasticsearch

@Bean
  public IndexNameGenerator indexNameGenerator(RepositoryConfiguration repositoryConfiguration, Client client) {
    // Wait for a connection to ES and retry each 5 seconds
    Single<Integer> singleVersion = client.getVersion()
        .retryWhen(error -> error.flatMap(
            throwable -> Observable.just(new Object()).delay(5, TimeUnit.SECONDS).toFlowable(BackpressureStrategy.LATEST)));

    singleVersion.subscribe();

    Integer version = singleVersion.blockingGet();

    if (version == 6 || repositoryConfiguration.isPerTypeIndex()) {
      return new PerTypeIndexNameGenerator(repositoryConfiguration.getIndexName());
    } else {
      return new MultiTypeIndexNameGenerator(repositoryConfiguration.getIndexName());
    }
  }
}

代码示例来源:origin: AppStoreFoundation/asf-sdk

private void handleCampaign() {
 compositeDisposable.add(ReactiveNetwork.observeInternetConnectivity()
   .subscribeOn(Schedulers.io())
   .filter(hasInternet -> hasInternet)
   .filter(__ -> this.campaignId == null)
   .firstOrError()
   .flatMap(__ -> campaignService.getCampaign())
   .retryWhen(throwableObservable -> throwableObservable.toObservable()
     .flatMap(throwable -> {
      throwable.printStackTrace();
      return ReactiveNetwork.observeInternetConnectivity();
     })
     .flatMap(this::retryIfNetworkAvailable)
     .toFlowable(BackpressureStrategy.LATEST))
   .doOnSuccess(this::processCampaign)
   .subscribe());
}

代码示例来源:origin: akarnokd/akarnokd-misc

@Test
public void test1() {
  Single.timer(1, TimeUnit.SECONDS)
    .doOnSubscribe(s -> System.out.println("subscribing"))
    .map(v -> { throw new RuntimeException(); })
    .retryWhen(errors -> {
      AtomicInteger counter = new AtomicInteger();
      return errors
           .takeWhile(e -> counter.getAndIncrement() != 3)
           .flatMap(e -> {
             System.out.println("delay retry by " + counter.get() + " second(s)");
              return Flowable.timer(counter.get(), TimeUnit.SECONDS);
           });
    })
    .blockingGet();
}

相关文章

微信公众号

最新文章

更多