io.reactivex.Single.doOnError()方法的使用及代码示例

x33g5p2x  于2022-01-29 转载在 其他  
字(10.6k)|赞(0)|评价(0)|浏览(86)

本文整理了Java中io.reactivex.Single.doOnError()方法的一些代码示例,展示了Single.doOnError()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Single.doOnError()方法的具体详情如下:
包路径:io.reactivex.Single
类名称:Single
方法名:doOnError

Single.doOnError介绍

[英]Calls the shared consumer with the error sent via onError for each SingleObserver that subscribes to the current Single. Scheduler: doOnError does not operate by default on a particular Scheduler.
[中]为订阅当前单曲的每个SingleObserver调用共享消费者,并通过OneRor发送错误。计划程序:默认情况下,doOnError不会在特定计划程序上运行。

代码示例

代码示例来源:origin: TeamNewPipe/NewPipe

@NonNull
public Single<StreamInfo> getStream() {
  return ExtractorHelper.getStreamInfo(this.serviceId, this.url, false)
      .subscribeOn(Schedulers.io())
      .doOnError(throwable -> error = throwable);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void doOnError() {
  error.doOnError(null);
}

代码示例来源:origin: pockethub/PocketHub

/**
 * Add issue filter to store
 * <p/>
 * This method may perform file I/O and should never be called on the
 * UI-thread
 *
 * @param filter
 */
public Single<IssueFilter> addIssueFilter(final IssueFilter filter) {
  return Single.fromCallable(() -> {
    final File cache = new File(root, "issue_filters.ser");
    Collection<IssueFilter> filters = read(cache);
    if (filters == null) {
      filters = new HashSet<>();
    }
    if (filters.add(filter)) {
      write(cache, filters);
    }
    return filter;
  }).doOnError(e -> Log.d(TAG, "Exception adding issue filter", e));
}

代码示例来源:origin: pockethub/PocketHub

/**
   * Add issue filter from store
   * <p/>
   * This method may perform file I/O and should never be called on the
   * UI-thread
   *
   * @param filter
   */
  public Single<IssueFilter> removeIssueFilter(IssueFilter filter) {
    return Single.fromCallable(() -> {
      final File cache = new File(root, "issue_filters.ser");
      Collection<IssueFilter> filters = read(cache);
      if (filters != null && filters.remove(filter)) {
        write(cache, filters);
      }

      return filter;
    }).doOnError( e -> Log.d(TAG, "Exception removing issue filter", e));
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testOnErrorCalledOnScheduler() throws Exception {
  final CountDownLatch latch = new CountDownLatch(1);
  final AtomicReference<Thread> thread = new AtomicReference<Thread>();
  Single.<String>error(new Exception())
      .delay(0, TimeUnit.MILLISECONDS, Schedulers.newThread())
      .doOnError(new Consumer<Throwable>() {
        @Override
        public void accept(Throwable throwable) throws Exception {
          thread.set(Thread.currentThread());
          latch.countDown();
        }
      })
      .onErrorResumeNext(Single.just(""))
      .subscribe();
  latch.await();
  assertNotEquals(Thread.currentThread(), thread.get());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void doOnError() {
  final Object[] event = { null };
  Single.error(new TestException()).doOnError(new Consumer<Throwable>() {
    @Override
    public void accept(Throwable e) throws Exception {
      event[0] = e;
    }
  })
  .test();
  assertTrue(event[0].toString(), event[0] instanceof TestException);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void onErrorSuccess() {
  final int[] call = { 0 };
  Single.just(1)
  .doOnError(new Consumer<Throwable>() {
    @Override
    public void accept(Throwable v) throws Exception {
      call[0]++;
    }
  })
  .test()
  .assertResult(1);
  assertEquals(0, call[0]);
}

代码示例来源:origin: pockethub/PocketHub

public static <U> SingleTransformer<U, U> bindToLifecycle(Context context, CharSequence message) {
    return upstream -> {
      final MaterialDialog progressDialog = new MaterialDialog.Builder(context)
          .content(message)
          .progress(true, 0)
          .build();

      return upstream
          .doOnSubscribe(disposable -> progressDialog.show())
          .doOnSuccess(u -> progressDialog.dismiss())
          .doOnError(throwable -> progressDialog.dismiss());
    };
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void onErrorCrashes() {
  TestObserver<Object> to = Single.error(new TestException("Outer"))
  .doOnError(new Consumer<Throwable>() {
    @Override
    public void accept(Throwable v) throws Exception {
      throw new TestException("Inner");
    }
  })
  .test()
  .assertFailure(CompositeException.class);
  List<Throwable> errors = TestHelper.compositeList(to.errors().get(0));
  TestHelper.assertError(errors, 0, TestException.class, "Outer");
  TestHelper.assertError(errors, 1, TestException.class, "Inner");
}

代码示例来源:origin: Polidea/RxAndroidBle

private void reset() {
  hasCachedResults = false;
  this.deviceServicesObservable = getListOfServicesFromGatt()
      .map(wrapIntoRxBleDeviceServices())
      .switchIfEmpty(getTimeoutConfiguration().flatMap(scheduleActualDiscoveryWithTimeout()))
      .doOnSuccess(Functions.actionConsumer(new Action() {
        @Override
        public void run() throws Exception {
          hasCachedResults = true;
        }
      }))
      .doOnError(Functions.actionConsumer(new Action() {
        @Override
        public void run() {
          reset();
        }
      }))
      .cache();
}

代码示例来源:origin: chat-sdk/chat-sdk-android

protected Single<Thread> createAndOpenThread (String name, List<User> users) {
  return ChatSDK.thread().createThread(name, users)
      .observeOn(AndroidSchedulers.mainThread())
      .doOnSuccess(thread -> {
        if (thread != null) {
          ChatSDK.ui().startChatActivityForID(getApplicationContext(), thread.getEntityID());
        }
      }).doOnError(throwable -> ToastHelper.show(getApplicationContext(), R.string.create_thread_with_users_fail_toast));
}

代码示例来源:origin: eneim/toro

void refresh() throws IOException {
  Disposable disposable = //
    Observable.just(ytApi.playlistItems()
      .list(YOUTUBE_PLAYLIST_PART)
      .setPlaylistId(YOUTUBE_PLAYLIST_ID)
      .setPageToken(null)
      .setFields(YOUTUBE_PLAYLIST_FIELDS)
      .setMaxResults(YOUTUBE_PLAYLIST_MAX_RESULTS)
      .setKey(API_KEY)  //
    )
      .map(AbstractGoogleClientRequest::execute)
      .map(PlaylistItemListResponse::getItems)
      .flatMap(playlistItems -> Observable.fromIterable(playlistItems)
        .map(item -> item.getSnippet().getResourceId().getVideoId()))
      .toList()
      .map(ids -> ytApi.videos().list(YOUTUBE_VIDEOS_PART).setFields(YOUTUBE_VIDEOS_FIELDS) //
        .setKey(API_KEY).setId(TextUtils.join(",", ids)).execute())
      .subscribeOn(Schedulers.io())
      .observeOn(AndroidSchedulers.mainThread())
      .doOnError(
        throwable -> Log.e(TAG, "accept() called with: throwable = [" + throwable + "]"))
      .doOnSuccess(
        response -> Log.d(TAG, "accept() called with: response = [" + response + "]"))
      .onErrorReturnItem(new VideoListResponse()) // Bad work around
      .doOnSuccess(liveData::setValue)
      .subscribe();
  disposables.add(disposable);
 }
}

代码示例来源:origin: io.gravitee.elasticsearch/gravitee-common-elasticsearch

@Override
public Single<Integer> getVersion() throws ElasticsearchException {
  return httpClient
      .get(URL_ROOT)
      .rxSend()
      .doOnError(throwable -> logger.error("Unable to get a connection to Elasticsearch", throwable))
      .map(response -> mapper.readTree(response.bodyAsString()).path("version").path("number").asText())
      .map(sVersion -> {
        float result = Float.valueOf(sVersion.substring(0, 3));
        int version = Integer.valueOf(sVersion.substring(0, 1));
        if (result < 2) {
          logger.warn("Please upgrade to Elasticsearch 2 or later. version={}", version);
        }
        return version;
      });
}

代码示例来源:origin: xiancloud/xian

/**
   * @param url       提交的URL
   * @param paramsMap parameter map
   * @return 响应
   */
  public static Single<String> post(String url, Map<String, String> paramsMap) {
    String applicationXwwwformUrlEncodedString = HttpUtil.xwwwformEncode(paramsMap);
    return HttpUtil.post(url, applicationXwwwformUrlEncodedString, new HashMap<String, String>() {{
      put("Content-Type", "application/x-www-form-urlencoded;charset=utf-8");
      put("Accept", "application/json;charset=utf-8");
    }}).doOnError(LOG::error);
  }
}

代码示例来源:origin: com.blackducksoftware.bdio/bdio-tinkerpop

@Override
public Publisher<?> persistFramedEntries(Flowable<Map<String, Object>> framedEntries) {
  return framedEntries
      .flatMapIterable(BdioDocument::toGraphNodes)
      .reduce(new NodeAccumulator(), NodeAccumulator::addNode)
      .doOnSuccess(NodeAccumulator::addEdges)
      .doAfterSuccess(NodeAccumulator::commitTx)
      .doOnError(this::handleError)
      .toFlowable();
}

代码示例来源:origin: gravitee-io/graviteeio-access-management

@Override
public Single<Certificate> update(Certificate certificate) {
  // update date
  certificate.setUpdatedAt(new Date());
  return certificateRepository.update(certificate)
      .flatMap(certificate1 -> {
        // Reload domain to take care about certificate update
        Event event = new Event(Type.CERTIFICATE, new Payload(certificate1.getId(), certificate1.getDomain(), Action.UPDATE));
        return domainService.reload(certificate1.getDomain(), event).flatMap(domain1 -> Single.just(certificate1));
      })
      .doOnError(ex -> {
        LOGGER.error("An error occurs while trying to update a certificate", ex);
        throw new TechnicalManagementException("An error occurs while trying to update a certificate", ex);
      });
}

代码示例来源:origin: gentics/mesh

@Override
public Single<JsonObject> getDocument(String index, String uuid) {
  String fullIndex = installationPrefix() + index;
  return client.getDocument(fullIndex, DEFAULT_TYPE, uuid).async()
    .map(response -> {
      if (log.isDebugEnabled()) {
        log.debug("Get object {" + uuid + "} from index {" + fullIndex + "}");
      }
      return response;
    }).timeout(getOptions().getTimeout(), TimeUnit.MILLISECONDS).doOnError(error -> {
      log.error("Could not get object {" + uuid + "} from index {" + fullIndex + "}", error);
    });
}

代码示例来源:origin: io.knotx/knotx-core

private Observable<ModuleDescriptor> deployVerticle(final JsonObject config,
  final ModuleDescriptor module) {
 return vertx
   .rxDeployVerticle(module.getName(), getModuleOptions(config, module.getAlias()))
   .map(deployId ->
     new ModuleDescriptor(module)
       .setDeploymentId(deployId)
       .setState(DeploymentState.SUCCESS))
   .doOnError(error ->
     LOGGER.error("Can't deploy {}: {}", module.toDescriptorLine(), error))
   .onErrorResumeNext((err) ->
     Single.just(new ModuleDescriptor(module).setState(DeploymentState.FAILED)))
   .toObservable();
}

代码示例来源:origin: Cognifide/knotx

private Observable<ModuleDescriptor> deployVerticle(final JsonObject config,
  final ModuleDescriptor module) {
 return vertx
   .rxDeployVerticle(module.getName(), getModuleOptions(config, module.getAlias()))
   .map(deployId ->
     new ModuleDescriptor(module)
       .setDeploymentId(deployId)
       .setState(DeploymentState.SUCCESS))
   .doOnError(error ->
     LOGGER.error("Can't deploy {}: {}", module.toDescriptorLine(), error))
   .onErrorResumeNext((err) ->
     Single.just(new ModuleDescriptor(module).setState(DeploymentState.FAILED)))
   .toObservable();
}

代码示例来源:origin: d4rken/RxShell

public Session(RxShell.Session session, CmdProcessor cmdProcessor) {
  this.session = session;
  this.cmdProcessor = cmdProcessor;
  this.waitFor = session.waitFor().cache();
  this.cancel = session.cancel()
      .doOnComplete(() -> { if (RXSDebug.isDebug()) Timber.tag(TAG).v("cancel():doOnComplete");})
      .doOnError(t -> { if (RXSDebug.isDebug()) Timber.tag(TAG).v(t, "cancel():doOnError");})
      .cache();
  this.close = cmdProcessor.isIdle()
      .filter(i -> i)
      .first(true)
      .flatMap(i -> session.close())
      .doOnSuccess(s -> { if (RXSDebug.isDebug()) Timber.tag(TAG).v("close():doOnSuccess %s", s);})
      .doOnError(t -> { if (RXSDebug.isDebug()) Timber.tag(TAG).v(t, "close():doOnError");})
      .cache();
}

相关文章

微信公众号

最新文章

更多