io.reactivex.Single.cache()方法的使用及代码示例

x33g5p2x  于2022-01-29 转载在 其他  
字(10.5k)|赞(0)|评价(0)|浏览(115)

本文整理了Java中io.reactivex.Single.cache()方法的一些代码示例,展示了Single.cache()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Single.cache()方法的具体详情如下:
包路径:io.reactivex.Single
类名称:Single
方法名:cache

Single.cache介绍

[英]Stores the success value or exception from the current Single and replays it to late SingleObservers.

The returned Single subscribes to the current Single when the first SingleObserver subscribes. Scheduler: cache does not operate by default on a particular Scheduler.
[中]存储当前单曲的成功值或异常,并将其重放到后期单曲。
当第一个SingleObserver订阅时,返回的Single订阅当前Single。调度程序:默认情况下,缓存不会在特定调度程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

PublishProcessor<Integer> pp = PublishProcessor.create();
final Single<Integer> cached = pp.single(-99).cache();

代码示例来源:origin: ReactiveX/RxJava

@Test
public void normal() {
  Single<Integer> cache = Single.just(1).cache();
  cache
  .test()
  .assertResult(1);
  cache
  .test()
  .assertResult(1);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void addRemoveRace() {
  for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) {
    PublishProcessor<Integer> pp = PublishProcessor.create();
    final Single<Integer> cached = pp.single(-99).cache();
    final TestObserver<Integer> to1 = cached.test();
    Runnable r1 = new Runnable() {
      @Override
      public void run() {
        to1.cancel();
      }
    };
    Runnable r2 = new Runnable() {
      @Override
      public void run() {
        cached.test();
      }
    };
    TestHelper.race(r1, r2);
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void error() {
  Single<Object> cache = Single.error(new TestException())
  .cache();
  cache
  .test()
  .assertFailure(TestException.class);
  cache
  .test()
  .assertFailure(TestException.class);
}

代码示例来源:origin: pockethub/PocketHub

/**
 * Create dialog helper to display assignees
 *
 * @param activity
 * @param requestCode
 * @param repository
 */
public AssigneeDialog(final BaseActivity activity,
    final int requestCode, final Repository repository) {
  this.activity = activity;
  this.requestCode = requestCode;
  GitHubRequest<Response<Page<User>>> gitHubRequest = page -> ServiceGenerator
      .createService(activity, IssueAssigneeService.class)
      .getAssignees(repository.owner().login(), repository.name(), page);
  assigneeSingle = RxPageUtil.getAllPages(gitHubRequest, 1)
      .flatMap(page -> Observable.fromIterable(page.items()))
      .toSortedList((o1, o2) -> CASE_INSENSITIVE_ORDER.compare(o1.login(), o2.login()))
      .compose(RxProgress.bindToLifecycle(activity, R.string.loading_collaborators))
      .cache();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void delayed() {
  PublishSubject<Integer> ps = PublishSubject.create();
  Single<Integer> cache = ps.single(-99).cache();
  TestObserver<Integer> to1 = cache.test();
  TestObserver<Integer> to2 = cache.test();
  ps.onNext(1);
  ps.onComplete();
  to1.assertResult(1);
  to2.assertResult(1);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void cancelImmediately() {
  PublishProcessor<Integer> pp = PublishProcessor.create();
  Single<Integer> cached = pp.single(-99).cache();
  TestObserver<Integer> to = cached.test(true);
  pp.onNext(1);
  pp.onComplete();
  to.assertEmpty();
  cached.test().assertResult(1);
}

代码示例来源:origin: pockethub/PocketHub

/**
 * Create dialog helper to display milestones
 *
 * @param activity
 * @param requestCode
 * @param repository
 */
public MilestoneDialog(final BaseActivity activity,
    final int requestCode, final Repository repository) {
  this.activity = activity;
  this.requestCode = requestCode;
  GitHubRequest<Response<Page<Milestone>>> gitHubRequest = page -> ServiceGenerator
      .createService(activity, IssueMilestoneService.class)
      .getRepositoryMilestones(repository.owner().login(), repository.name(),
          "open", page);
  milestoneSingle = RxPageUtil.getAllPages(gitHubRequest, 1)
      .flatMap(page -> Observable.fromIterable(page.items()))
      .toSortedList((m1, m2) -> CASE_INSENSITIVE_ORDER.compare(m1.title(), m2.title()))
      .compose(RxProgress.bindToLifecycle(activity, R.string.loading_milestones))
      .cache();
}

代码示例来源:origin: pockethub/PocketHub

/**
 * Create dialog helper to display labels
 *
 * @param activity
 * @param requestCode
 * @param repository
 */
public LabelsDialog(final BaseActivity activity,
    final int requestCode, final Repository repository) {
  this.activity = activity;
  this.requestCode = requestCode;
  GitHubRequest<Response<Page<Label>>> gitHubRequest = page -> ServiceGenerator
      .createService(activity, IssueLabelService.class)
      .getRepositoryLabels(repository.owner().login(), repository.name(), page);
  labelsSingle = RxPageUtil.getAllPages(gitHubRequest, 1)
      .flatMap(page -> Observable.fromIterable(page.items()))
      .toSortedList((o1, o2) -> CASE_INSENSITIVE_ORDER.compare(o1.name(), o2.name()))
      .compose(RxProgress.bindToLifecycle(activity, R.string.loading_labels))
      .cache();
}

代码示例来源:origin: pockethub/PocketHub

/**
 * Create dialog helper to display refs
 *
 * @param activity
 * @param requestCode
 * @param repository
 */
public RefDialog(final BaseActivity activity,
    final int requestCode, final Repository repository) {
  this.activity = activity;
  this.requestCode = requestCode;
  GitHubRequest<Response<Page<GitReference>>> gitHubRequest = page -> ServiceGenerator
      .createService(activity, GitService.class)
      .getGitReferences(repository.owner().login(), repository.name(), page);
  refSingle = RxPageUtil.getAllPages(gitHubRequest, 1)
      .flatMap(page -> Observable.fromIterable(page.items()))
      .filter(RefUtils::isValid)
      .toSortedList((o1, o2) -> CASE_INSENSITIVE_ORDER.compare(o1.ref(), o2.ref()))
      .compose(RxProgress.bindToLifecycle(activity, R.string.loading_refs))
      .cache();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void delayedDisposed() {
  PublishSubject<Integer> ps = PublishSubject.create();
  Single<Integer> cache = ps.single(-99).cache();
  TestObserver<Integer> to1 = cache.test();
  TestObserver<Integer> to2 = cache.test();
  to1.cancel();
  ps.onNext(1);
  ps.onComplete();
  to1.assertNoValues().assertNoErrors().assertNotComplete();
  to2.assertResult(1);
}

代码示例来源:origin: Polidea/RxAndroidBle

private void reset() {
  hasCachedResults = false;
  this.deviceServicesObservable = getListOfServicesFromGatt()
      .map(wrapIntoRxBleDeviceServices())
      .switchIfEmpty(getTimeoutConfiguration().flatMap(scheduleActualDiscoveryWithTimeout()))
      .doOnSuccess(Functions.actionConsumer(new Action() {
        @Override
        public void run() throws Exception {
          hasCachedResults = true;
        }
      }))
      .doOnError(Functions.actionConsumer(new Action() {
        @Override
        public void run() {
          reset();
        }
      }))
      .cache();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void crossCancel() {
  PublishSubject<Integer> ps = PublishSubject.create();
  Single<Integer> cache = ps.single(-99).cache();
  final TestSubscriber<Integer> ts1 = new TestSubscriber<Integer>();
  TestSubscriber<Integer> ts2 = new TestSubscriber<Integer>() {
    @Override
    public void onNext(Integer t) {
      super.onNext(t);
      ts1.cancel();
    }
  };
  cache.toFlowable().subscribe(ts2);
  cache.toFlowable().subscribe(ts1);
  ps.onNext(1);
  ps.onComplete();
  ts1.assertNoValues().assertNoErrors().assertNotComplete();
  ts2.assertResult(1);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void crossCancelOnError() {
  PublishSubject<Integer> ps = PublishSubject.create();
  Single<Integer> cache = ps.single(-99).cache();
  final TestSubscriber<Integer> ts1 = new TestSubscriber<Integer>();
  TestSubscriber<Integer> ts2 = new TestSubscriber<Integer>() {
    @Override
    public void onError(Throwable t) {
      super.onError(t);
      ts1.cancel();
    }
  };
  cache.toFlowable().subscribe(ts2);
  cache.toFlowable().subscribe(ts1);
  ps.onError(new TestException());
  ts1.assertNoValues().assertNoErrors().assertNotComplete();
  ts2.assertFailure(TestException.class);
}

代码示例来源:origin: com.microsoft.rest.v2/client-runtime

/**
 * Creates a buffered HTTP response.
 * @param innerHttpResponse The HTTP response to buffer.
 */
public BufferedHttpResponse(HttpResponse innerHttpResponse) {
  this.innerHttpResponse = innerHttpResponse;
  this.cachedBody = innerHttpResponse.bodyAsByteArray().cache();
}

代码示例来源:origin: radixdlt/radixdlt-java

public RadixJsonRpcClient(WebSocketClient wsClient) {
  this.wsClient = wsClient;
  final JsonParser parser = new JsonParser();
  this.messages = this.wsClient.getMessages()
    .map(msg -> parser.parse(msg).getAsJsonObject())
    .publish()
    .refCount();
  if (!CHECK_API_VERSION) {
    this.serverApiVersion = Single.just(API_VERSION);
  } else {
    this.serverApiVersion = jsonRpcCall("Api.getVersion")
      .map(result -> result.getAsJsonObject().get("version").getAsInt())
      .cache();
  }
  this.universeConfig = jsonRpcCall("Universe.getUniverse")
    .map(result -> RadixJson.getGson().fromJson(result, RadixUniverseConfig.class))
    .cache();
}

代码示例来源:origin: WaylonBrown/LifecycleAwareRx

/**
 * NOTE: This throws a NoSuchElementException if the item is filtered out since a Single can't be empty, so the 
 * onError is called after onDestroy() when using Single().
 *
 * @param upstream
 * @return
 */
@Override
public SingleSource<T> apply(Single<T> upstream) {
  Single<T> transformedStream = upstream
      .cache() // Cache to replay emitted values to late subscriber
      .filter(filterIfDestroyedPredicate) // Filter to stop emitting items once LifecycleOwner is destroyed
      .toSingle();
  setReactiveType((R)transformedStream);
  
  return transformedStream;
}

代码示例来源:origin: io.knotx/knotx-databridge-core

private Single<JsonObject> fetchServiceData(DataSourceEntry service, KnotContext request) {
 LOGGER.debug("Fetching data from service {} {}", service.getAddress(), service.getParams());
 try {
  return request.getCache()
    .get(service.getCacheKey(), () -> {
     LOGGER.debug("Requesting data from adapter {} with params {}", service.getAddress(),
       service.getParams());
     return serviceEngine.doServiceCall(service, request).cache();
    });
 } catch (ExecutionException e) {
  LOGGER.fatal("Unable to get service data {}", e);
  return Single.error(e);
 }
}

代码示例来源:origin: Cognifide/knotx

private Single<JsonObject> fetchServiceData(ServiceEntry service, KnotContext request) {
 LOGGER.debug("Fetching data from service {} {}", service.getAddress(), service.getParams());
 try {
  return request.getCache()
    .get(service.getCacheKey(), () -> {
     LOGGER.debug("Requesting data from adapter {} with params {}", service.getAddress(),
       service.getParams());
     return serviceEngine.doServiceCall(service, request).cache();
    });
 } catch (ExecutionException e) {
  LOGGER.fatal("Unable to get service data {}", e);
  return Single.error(e);
 }
}

代码示例来源:origin: d4rken/RxShell

public Session(RxShell.Session session, CmdProcessor cmdProcessor) {
  this.session = session;
  this.cmdProcessor = cmdProcessor;
  this.waitFor = session.waitFor().cache();
  this.cancel = session.cancel()
      .doOnComplete(() -> { if (RXSDebug.isDebug()) Timber.tag(TAG).v("cancel():doOnComplete");})
      .doOnError(t -> { if (RXSDebug.isDebug()) Timber.tag(TAG).v(t, "cancel():doOnError");})
      .cache();
  this.close = cmdProcessor.isIdle()
      .filter(i -> i)
      .first(true)
      .flatMap(i -> session.close())
      .doOnSuccess(s -> { if (RXSDebug.isDebug()) Timber.tag(TAG).v("close():doOnSuccess %s", s);})
      .doOnError(t -> { if (RXSDebug.isDebug()) Timber.tag(TAG).v(t, "close():doOnError");})
      .cache();
}

相关文章

微信公众号

最新文章

更多