本文整理了Java中io.reactivex.Single.doOnError()
方法的一些代码示例,展示了Single.doOnError()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Single.doOnError()
方法的具体详情如下:
包路径:io.reactivex.Single
类名称:Single
方法名:doOnError
[英]Calls the shared consumer with the error sent via onError for each SingleObserver that subscribes to the current Single. Scheduler: doOnError does not operate by default on a particular Scheduler.
[中]为订阅当前单曲的每个SingleObserver调用共享消费者,并通过OneRor发送错误。计划程序:默认情况下,doOnError不会在特定计划程序上运行。
代码示例来源:origin: TeamNewPipe/NewPipe
@NonNull
public Single<StreamInfo> getStream() {
return ExtractorHelper.getStreamInfo(this.serviceId, this.url, false)
.subscribeOn(Schedulers.io())
.doOnError(throwable -> error = throwable);
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void doOnError() {
error.doOnError(null);
}
代码示例来源:origin: pockethub/PocketHub
/**
* Add issue filter to store
* <p/>
* This method may perform file I/O and should never be called on the
* UI-thread
*
* @param filter
*/
public Single<IssueFilter> addIssueFilter(final IssueFilter filter) {
return Single.fromCallable(() -> {
final File cache = new File(root, "issue_filters.ser");
Collection<IssueFilter> filters = read(cache);
if (filters == null) {
filters = new HashSet<>();
}
if (filters.add(filter)) {
write(cache, filters);
}
return filter;
}).doOnError(e -> Log.d(TAG, "Exception adding issue filter", e));
}
代码示例来源:origin: pockethub/PocketHub
/**
* Add issue filter from store
* <p/>
* This method may perform file I/O and should never be called on the
* UI-thread
*
* @param filter
*/
public Single<IssueFilter> removeIssueFilter(IssueFilter filter) {
return Single.fromCallable(() -> {
final File cache = new File(root, "issue_filters.ser");
Collection<IssueFilter> filters = read(cache);
if (filters != null && filters.remove(filter)) {
write(cache, filters);
}
return filter;
}).doOnError( e -> Log.d(TAG, "Exception removing issue filter", e));
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testOnErrorCalledOnScheduler() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Thread> thread = new AtomicReference<Thread>();
Single.<String>error(new Exception())
.delay(0, TimeUnit.MILLISECONDS, Schedulers.newThread())
.doOnError(new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
thread.set(Thread.currentThread());
latch.countDown();
}
})
.onErrorResumeNext(Single.just(""))
.subscribe();
latch.await();
assertNotEquals(Thread.currentThread(), thread.get());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void doOnError() {
final Object[] event = { null };
Single.error(new TestException()).doOnError(new Consumer<Throwable>() {
@Override
public void accept(Throwable e) throws Exception {
event[0] = e;
}
})
.test();
assertTrue(event[0].toString(), event[0] instanceof TestException);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void onErrorSuccess() {
final int[] call = { 0 };
Single.just(1)
.doOnError(new Consumer<Throwable>() {
@Override
public void accept(Throwable v) throws Exception {
call[0]++;
}
})
.test()
.assertResult(1);
assertEquals(0, call[0]);
}
代码示例来源:origin: pockethub/PocketHub
public static <U> SingleTransformer<U, U> bindToLifecycle(Context context, CharSequence message) {
return upstream -> {
final MaterialDialog progressDialog = new MaterialDialog.Builder(context)
.content(message)
.progress(true, 0)
.build();
return upstream
.doOnSubscribe(disposable -> progressDialog.show())
.doOnSuccess(u -> progressDialog.dismiss())
.doOnError(throwable -> progressDialog.dismiss());
};
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void onErrorCrashes() {
TestObserver<Object> to = Single.error(new TestException("Outer"))
.doOnError(new Consumer<Throwable>() {
@Override
public void accept(Throwable v) throws Exception {
throw new TestException("Inner");
}
})
.test()
.assertFailure(CompositeException.class);
List<Throwable> errors = TestHelper.compositeList(to.errors().get(0));
TestHelper.assertError(errors, 0, TestException.class, "Outer");
TestHelper.assertError(errors, 1, TestException.class, "Inner");
}
代码示例来源:origin: Polidea/RxAndroidBle
private void reset() {
hasCachedResults = false;
this.deviceServicesObservable = getListOfServicesFromGatt()
.map(wrapIntoRxBleDeviceServices())
.switchIfEmpty(getTimeoutConfiguration().flatMap(scheduleActualDiscoveryWithTimeout()))
.doOnSuccess(Functions.actionConsumer(new Action() {
@Override
public void run() throws Exception {
hasCachedResults = true;
}
}))
.doOnError(Functions.actionConsumer(new Action() {
@Override
public void run() {
reset();
}
}))
.cache();
}
代码示例来源:origin: chat-sdk/chat-sdk-android
protected Single<Thread> createAndOpenThread (String name, List<User> users) {
return ChatSDK.thread().createThread(name, users)
.observeOn(AndroidSchedulers.mainThread())
.doOnSuccess(thread -> {
if (thread != null) {
ChatSDK.ui().startChatActivityForID(getApplicationContext(), thread.getEntityID());
}
}).doOnError(throwable -> ToastHelper.show(getApplicationContext(), R.string.create_thread_with_users_fail_toast));
}
代码示例来源:origin: eneim/toro
void refresh() throws IOException {
Disposable disposable = //
Observable.just(ytApi.playlistItems()
.list(YOUTUBE_PLAYLIST_PART)
.setPlaylistId(YOUTUBE_PLAYLIST_ID)
.setPageToken(null)
.setFields(YOUTUBE_PLAYLIST_FIELDS)
.setMaxResults(YOUTUBE_PLAYLIST_MAX_RESULTS)
.setKey(API_KEY) //
)
.map(AbstractGoogleClientRequest::execute)
.map(PlaylistItemListResponse::getItems)
.flatMap(playlistItems -> Observable.fromIterable(playlistItems)
.map(item -> item.getSnippet().getResourceId().getVideoId()))
.toList()
.map(ids -> ytApi.videos().list(YOUTUBE_VIDEOS_PART).setFields(YOUTUBE_VIDEOS_FIELDS) //
.setKey(API_KEY).setId(TextUtils.join(",", ids)).execute())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.doOnError(
throwable -> Log.e(TAG, "accept() called with: throwable = [" + throwable + "]"))
.doOnSuccess(
response -> Log.d(TAG, "accept() called with: response = [" + response + "]"))
.onErrorReturnItem(new VideoListResponse()) // Bad work around
.doOnSuccess(liveData::setValue)
.subscribe();
disposables.add(disposable);
}
}
代码示例来源:origin: io.gravitee.elasticsearch/gravitee-common-elasticsearch
@Override
public Single<Integer> getVersion() throws ElasticsearchException {
return httpClient
.get(URL_ROOT)
.rxSend()
.doOnError(throwable -> logger.error("Unable to get a connection to Elasticsearch", throwable))
.map(response -> mapper.readTree(response.bodyAsString()).path("version").path("number").asText())
.map(sVersion -> {
float result = Float.valueOf(sVersion.substring(0, 3));
int version = Integer.valueOf(sVersion.substring(0, 1));
if (result < 2) {
logger.warn("Please upgrade to Elasticsearch 2 or later. version={}", version);
}
return version;
});
}
代码示例来源:origin: xiancloud/xian
/**
* @param url 提交的URL
* @param paramsMap parameter map
* @return 响应
*/
public static Single<String> post(String url, Map<String, String> paramsMap) {
String applicationXwwwformUrlEncodedString = HttpUtil.xwwwformEncode(paramsMap);
return HttpUtil.post(url, applicationXwwwformUrlEncodedString, new HashMap<String, String>() {{
put("Content-Type", "application/x-www-form-urlencoded;charset=utf-8");
put("Accept", "application/json;charset=utf-8");
}}).doOnError(LOG::error);
}
}
代码示例来源:origin: com.blackducksoftware.bdio/bdio-tinkerpop
@Override
public Publisher<?> persistFramedEntries(Flowable<Map<String, Object>> framedEntries) {
return framedEntries
.flatMapIterable(BdioDocument::toGraphNodes)
.reduce(new NodeAccumulator(), NodeAccumulator::addNode)
.doOnSuccess(NodeAccumulator::addEdges)
.doAfterSuccess(NodeAccumulator::commitTx)
.doOnError(this::handleError)
.toFlowable();
}
代码示例来源:origin: gravitee-io/graviteeio-access-management
@Override
public Single<Certificate> update(Certificate certificate) {
// update date
certificate.setUpdatedAt(new Date());
return certificateRepository.update(certificate)
.flatMap(certificate1 -> {
// Reload domain to take care about certificate update
Event event = new Event(Type.CERTIFICATE, new Payload(certificate1.getId(), certificate1.getDomain(), Action.UPDATE));
return domainService.reload(certificate1.getDomain(), event).flatMap(domain1 -> Single.just(certificate1));
})
.doOnError(ex -> {
LOGGER.error("An error occurs while trying to update a certificate", ex);
throw new TechnicalManagementException("An error occurs while trying to update a certificate", ex);
});
}
代码示例来源:origin: gentics/mesh
@Override
public Single<JsonObject> getDocument(String index, String uuid) {
String fullIndex = installationPrefix() + index;
return client.getDocument(fullIndex, DEFAULT_TYPE, uuid).async()
.map(response -> {
if (log.isDebugEnabled()) {
log.debug("Get object {" + uuid + "} from index {" + fullIndex + "}");
}
return response;
}).timeout(getOptions().getTimeout(), TimeUnit.MILLISECONDS).doOnError(error -> {
log.error("Could not get object {" + uuid + "} from index {" + fullIndex + "}", error);
});
}
代码示例来源:origin: io.knotx/knotx-core
private Observable<ModuleDescriptor> deployVerticle(final JsonObject config,
final ModuleDescriptor module) {
return vertx
.rxDeployVerticle(module.getName(), getModuleOptions(config, module.getAlias()))
.map(deployId ->
new ModuleDescriptor(module)
.setDeploymentId(deployId)
.setState(DeploymentState.SUCCESS))
.doOnError(error ->
LOGGER.error("Can't deploy {}: {}", module.toDescriptorLine(), error))
.onErrorResumeNext((err) ->
Single.just(new ModuleDescriptor(module).setState(DeploymentState.FAILED)))
.toObservable();
}
代码示例来源:origin: Cognifide/knotx
private Observable<ModuleDescriptor> deployVerticle(final JsonObject config,
final ModuleDescriptor module) {
return vertx
.rxDeployVerticle(module.getName(), getModuleOptions(config, module.getAlias()))
.map(deployId ->
new ModuleDescriptor(module)
.setDeploymentId(deployId)
.setState(DeploymentState.SUCCESS))
.doOnError(error ->
LOGGER.error("Can't deploy {}: {}", module.toDescriptorLine(), error))
.onErrorResumeNext((err) ->
Single.just(new ModuleDescriptor(module).setState(DeploymentState.FAILED)))
.toObservable();
}
代码示例来源:origin: d4rken/RxShell
public Session(RxShell.Session session, CmdProcessor cmdProcessor) {
this.session = session;
this.cmdProcessor = cmdProcessor;
this.waitFor = session.waitFor().cache();
this.cancel = session.cancel()
.doOnComplete(() -> { if (RXSDebug.isDebug()) Timber.tag(TAG).v("cancel():doOnComplete");})
.doOnError(t -> { if (RXSDebug.isDebug()) Timber.tag(TAG).v(t, "cancel():doOnError");})
.cache();
this.close = cmdProcessor.isIdle()
.filter(i -> i)
.first(true)
.flatMap(i -> session.close())
.doOnSuccess(s -> { if (RXSDebug.isDebug()) Timber.tag(TAG).v("close():doOnSuccess %s", s);})
.doOnError(t -> { if (RXSDebug.isDebug()) Timber.tag(TAG).v(t, "close():doOnError");})
.cache();
}
内容来源于网络,如有侵权,请联系作者删除!