本文整理了Java中io.reactivex.Single.cache()
方法的一些代码示例,展示了Single.cache()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Single.cache()
方法的具体详情如下:
包路径:io.reactivex.Single
类名称:Single
方法名:cache
[英]Stores the success value or exception from the current Single and replays it to late SingleObservers.
The returned Single subscribes to the current Single when the first SingleObserver subscribes. Scheduler: cache does not operate by default on a particular Scheduler.
[中]存储当前单曲的成功值或异常,并将其重放到后期单曲。
当第一个SingleObserver订阅时,返回的Single订阅当前Single。调度程序:默认情况下,缓存不会在特定调度程序上运行。
代码示例来源:origin: ReactiveX/RxJava
PublishProcessor<Integer> pp = PublishProcessor.create();
final Single<Integer> cached = pp.single(-99).cache();
代码示例来源:origin: ReactiveX/RxJava
@Test
public void normal() {
Single<Integer> cache = Single.just(1).cache();
cache
.test()
.assertResult(1);
cache
.test()
.assertResult(1);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void addRemoveRace() {
for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) {
PublishProcessor<Integer> pp = PublishProcessor.create();
final Single<Integer> cached = pp.single(-99).cache();
final TestObserver<Integer> to1 = cached.test();
Runnable r1 = new Runnable() {
@Override
public void run() {
to1.cancel();
}
};
Runnable r2 = new Runnable() {
@Override
public void run() {
cached.test();
}
};
TestHelper.race(r1, r2);
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void error() {
Single<Object> cache = Single.error(new TestException())
.cache();
cache
.test()
.assertFailure(TestException.class);
cache
.test()
.assertFailure(TestException.class);
}
代码示例来源:origin: pockethub/PocketHub
/**
* Create dialog helper to display assignees
*
* @param activity
* @param requestCode
* @param repository
*/
public AssigneeDialog(final BaseActivity activity,
final int requestCode, final Repository repository) {
this.activity = activity;
this.requestCode = requestCode;
GitHubRequest<Response<Page<User>>> gitHubRequest = page -> ServiceGenerator
.createService(activity, IssueAssigneeService.class)
.getAssignees(repository.owner().login(), repository.name(), page);
assigneeSingle = RxPageUtil.getAllPages(gitHubRequest, 1)
.flatMap(page -> Observable.fromIterable(page.items()))
.toSortedList((o1, o2) -> CASE_INSENSITIVE_ORDER.compare(o1.login(), o2.login()))
.compose(RxProgress.bindToLifecycle(activity, R.string.loading_collaborators))
.cache();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void delayed() {
PublishSubject<Integer> ps = PublishSubject.create();
Single<Integer> cache = ps.single(-99).cache();
TestObserver<Integer> to1 = cache.test();
TestObserver<Integer> to2 = cache.test();
ps.onNext(1);
ps.onComplete();
to1.assertResult(1);
to2.assertResult(1);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void cancelImmediately() {
PublishProcessor<Integer> pp = PublishProcessor.create();
Single<Integer> cached = pp.single(-99).cache();
TestObserver<Integer> to = cached.test(true);
pp.onNext(1);
pp.onComplete();
to.assertEmpty();
cached.test().assertResult(1);
}
代码示例来源:origin: pockethub/PocketHub
/**
* Create dialog helper to display milestones
*
* @param activity
* @param requestCode
* @param repository
*/
public MilestoneDialog(final BaseActivity activity,
final int requestCode, final Repository repository) {
this.activity = activity;
this.requestCode = requestCode;
GitHubRequest<Response<Page<Milestone>>> gitHubRequest = page -> ServiceGenerator
.createService(activity, IssueMilestoneService.class)
.getRepositoryMilestones(repository.owner().login(), repository.name(),
"open", page);
milestoneSingle = RxPageUtil.getAllPages(gitHubRequest, 1)
.flatMap(page -> Observable.fromIterable(page.items()))
.toSortedList((m1, m2) -> CASE_INSENSITIVE_ORDER.compare(m1.title(), m2.title()))
.compose(RxProgress.bindToLifecycle(activity, R.string.loading_milestones))
.cache();
}
代码示例来源:origin: pockethub/PocketHub
/**
* Create dialog helper to display labels
*
* @param activity
* @param requestCode
* @param repository
*/
public LabelsDialog(final BaseActivity activity,
final int requestCode, final Repository repository) {
this.activity = activity;
this.requestCode = requestCode;
GitHubRequest<Response<Page<Label>>> gitHubRequest = page -> ServiceGenerator
.createService(activity, IssueLabelService.class)
.getRepositoryLabels(repository.owner().login(), repository.name(), page);
labelsSingle = RxPageUtil.getAllPages(gitHubRequest, 1)
.flatMap(page -> Observable.fromIterable(page.items()))
.toSortedList((o1, o2) -> CASE_INSENSITIVE_ORDER.compare(o1.name(), o2.name()))
.compose(RxProgress.bindToLifecycle(activity, R.string.loading_labels))
.cache();
}
代码示例来源:origin: pockethub/PocketHub
/**
* Create dialog helper to display refs
*
* @param activity
* @param requestCode
* @param repository
*/
public RefDialog(final BaseActivity activity,
final int requestCode, final Repository repository) {
this.activity = activity;
this.requestCode = requestCode;
GitHubRequest<Response<Page<GitReference>>> gitHubRequest = page -> ServiceGenerator
.createService(activity, GitService.class)
.getGitReferences(repository.owner().login(), repository.name(), page);
refSingle = RxPageUtil.getAllPages(gitHubRequest, 1)
.flatMap(page -> Observable.fromIterable(page.items()))
.filter(RefUtils::isValid)
.toSortedList((o1, o2) -> CASE_INSENSITIVE_ORDER.compare(o1.ref(), o2.ref()))
.compose(RxProgress.bindToLifecycle(activity, R.string.loading_refs))
.cache();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void delayedDisposed() {
PublishSubject<Integer> ps = PublishSubject.create();
Single<Integer> cache = ps.single(-99).cache();
TestObserver<Integer> to1 = cache.test();
TestObserver<Integer> to2 = cache.test();
to1.cancel();
ps.onNext(1);
ps.onComplete();
to1.assertNoValues().assertNoErrors().assertNotComplete();
to2.assertResult(1);
}
代码示例来源:origin: Polidea/RxAndroidBle
private void reset() {
hasCachedResults = false;
this.deviceServicesObservable = getListOfServicesFromGatt()
.map(wrapIntoRxBleDeviceServices())
.switchIfEmpty(getTimeoutConfiguration().flatMap(scheduleActualDiscoveryWithTimeout()))
.doOnSuccess(Functions.actionConsumer(new Action() {
@Override
public void run() throws Exception {
hasCachedResults = true;
}
}))
.doOnError(Functions.actionConsumer(new Action() {
@Override
public void run() {
reset();
}
}))
.cache();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void crossCancel() {
PublishSubject<Integer> ps = PublishSubject.create();
Single<Integer> cache = ps.single(-99).cache();
final TestSubscriber<Integer> ts1 = new TestSubscriber<Integer>();
TestSubscriber<Integer> ts2 = new TestSubscriber<Integer>() {
@Override
public void onNext(Integer t) {
super.onNext(t);
ts1.cancel();
}
};
cache.toFlowable().subscribe(ts2);
cache.toFlowable().subscribe(ts1);
ps.onNext(1);
ps.onComplete();
ts1.assertNoValues().assertNoErrors().assertNotComplete();
ts2.assertResult(1);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void crossCancelOnError() {
PublishSubject<Integer> ps = PublishSubject.create();
Single<Integer> cache = ps.single(-99).cache();
final TestSubscriber<Integer> ts1 = new TestSubscriber<Integer>();
TestSubscriber<Integer> ts2 = new TestSubscriber<Integer>() {
@Override
public void onError(Throwable t) {
super.onError(t);
ts1.cancel();
}
};
cache.toFlowable().subscribe(ts2);
cache.toFlowable().subscribe(ts1);
ps.onError(new TestException());
ts1.assertNoValues().assertNoErrors().assertNotComplete();
ts2.assertFailure(TestException.class);
}
代码示例来源:origin: com.microsoft.rest.v2/client-runtime
/**
* Creates a buffered HTTP response.
* @param innerHttpResponse The HTTP response to buffer.
*/
public BufferedHttpResponse(HttpResponse innerHttpResponse) {
this.innerHttpResponse = innerHttpResponse;
this.cachedBody = innerHttpResponse.bodyAsByteArray().cache();
}
代码示例来源:origin: radixdlt/radixdlt-java
public RadixJsonRpcClient(WebSocketClient wsClient) {
this.wsClient = wsClient;
final JsonParser parser = new JsonParser();
this.messages = this.wsClient.getMessages()
.map(msg -> parser.parse(msg).getAsJsonObject())
.publish()
.refCount();
if (!CHECK_API_VERSION) {
this.serverApiVersion = Single.just(API_VERSION);
} else {
this.serverApiVersion = jsonRpcCall("Api.getVersion")
.map(result -> result.getAsJsonObject().get("version").getAsInt())
.cache();
}
this.universeConfig = jsonRpcCall("Universe.getUniverse")
.map(result -> RadixJson.getGson().fromJson(result, RadixUniverseConfig.class))
.cache();
}
代码示例来源:origin: WaylonBrown/LifecycleAwareRx
/**
* NOTE: This throws a NoSuchElementException if the item is filtered out since a Single can't be empty, so the
* onError is called after onDestroy() when using Single().
*
* @param upstream
* @return
*/
@Override
public SingleSource<T> apply(Single<T> upstream) {
Single<T> transformedStream = upstream
.cache() // Cache to replay emitted values to late subscriber
.filter(filterIfDestroyedPredicate) // Filter to stop emitting items once LifecycleOwner is destroyed
.toSingle();
setReactiveType((R)transformedStream);
return transformedStream;
}
代码示例来源:origin: io.knotx/knotx-databridge-core
private Single<JsonObject> fetchServiceData(DataSourceEntry service, KnotContext request) {
LOGGER.debug("Fetching data from service {} {}", service.getAddress(), service.getParams());
try {
return request.getCache()
.get(service.getCacheKey(), () -> {
LOGGER.debug("Requesting data from adapter {} with params {}", service.getAddress(),
service.getParams());
return serviceEngine.doServiceCall(service, request).cache();
});
} catch (ExecutionException e) {
LOGGER.fatal("Unable to get service data {}", e);
return Single.error(e);
}
}
代码示例来源:origin: Cognifide/knotx
private Single<JsonObject> fetchServiceData(ServiceEntry service, KnotContext request) {
LOGGER.debug("Fetching data from service {} {}", service.getAddress(), service.getParams());
try {
return request.getCache()
.get(service.getCacheKey(), () -> {
LOGGER.debug("Requesting data from adapter {} with params {}", service.getAddress(),
service.getParams());
return serviceEngine.doServiceCall(service, request).cache();
});
} catch (ExecutionException e) {
LOGGER.fatal("Unable to get service data {}", e);
return Single.error(e);
}
}
代码示例来源:origin: d4rken/RxShell
public Session(RxShell.Session session, CmdProcessor cmdProcessor) {
this.session = session;
this.cmdProcessor = cmdProcessor;
this.waitFor = session.waitFor().cache();
this.cancel = session.cancel()
.doOnComplete(() -> { if (RXSDebug.isDebug()) Timber.tag(TAG).v("cancel():doOnComplete");})
.doOnError(t -> { if (RXSDebug.isDebug()) Timber.tag(TAG).v(t, "cancel():doOnError");})
.cache();
this.close = cmdProcessor.isIdle()
.filter(i -> i)
.first(true)
.flatMap(i -> session.close())
.doOnSuccess(s -> { if (RXSDebug.isDebug()) Timber.tag(TAG).v("close():doOnSuccess %s", s);})
.doOnError(t -> { if (RXSDebug.isDebug()) Timber.tag(TAG).v(t, "close():doOnError");})
.cache();
}
内容来源于网络,如有侵权,请联系作者删除!