io.reactivex.Single.observeOn()方法的使用及代码示例

x33g5p2x  于2022-01-29 转载在 其他  
字(9.1k)|赞(0)|评价(0)|浏览(150)

本文整理了Java中io.reactivex.Single.observeOn()方法的一些代码示例,展示了Single.observeOn()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Single.observeOn()方法的具体详情如下:
包路径:io.reactivex.Single
类名称:Single
方法名:observeOn

Single.observeOn介绍

[英]Modifies a Single to emit its item (or notify of its error) on a specified Scheduler, asynchronously.

Scheduler: you specify which Scheduler this operator will use.
[中]在指定的计划程序上异步修改单个项以发出其项(或通知其错误)。
调度器:指定该操作员将使用的调度器。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Override
  public SingleSource<Object> apply(Single<Object> s) throws Exception {
    return s.observeOn(Schedulers.single());
  }
});

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void observeOnNull() {
  just1.observeOn(null);
}

代码示例来源:origin: pockethub/PocketHub

private void loadMarkdown(String raw, Repository repo) {
    MarkdownLoader.load(getActivity(), raw, repo, imageGetter, true)
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(rendered -> {
          bodyText.setText(rendered);
          showLoading(false);
        } , e -> ToastUtils.show(getActivity(), R.string.error_rendering_markdown));
  }
}

代码示例来源:origin: pockethub/PocketHub

private Observable<Page<NotificationThread>> getPageAndNext(int i) {
  return ServiceGenerator.createService(getActivity(), NotificationService.class)
      .getNotifications(filters, i)
      .subscribeOn(Schedulers.io())
      .observeOn(AndroidSchedulers.mainThread())
      .flatMapObservable(response -> {
        Page<NotificationThread> page = response.body();
        if (page.next() == null) {
          return Observable.just(page);
        }
        return Observable.just(page).concatWith(getPageAndNext(page.next()));
      });
}

代码示例来源:origin: pockethub/PocketHub

@Override
public void onItemClick(@NonNull Item item, @NonNull View view) {
  if (item instanceof UserItem) {
    User result = ((UserItem) item).getUser();
    ServiceGenerator.createService(getContext(), UserService.class)
        .getUser(result.login())
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .as(AutoDisposeUtils.bindToLifecycle(this))
        .subscribe(response ->
            startActivity(UserViewActivity.createIntent(response.body())));
  }
}

代码示例来源:origin: pockethub/PocketHub

@Override
public void onItemClick(@NonNull Item item, @NonNull View view) {
  if (item instanceof ContributorItem) {
    User contributor = ((ContributorItem) item).getUser();
    ServiceGenerator.createService(getContext(), UserService.class)
        .getUser(contributor.login())
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .as(AutoDisposeUtils.bindToLifecycle(this))
        .subscribe(response ->
            startActivity(UserViewActivity.createIntent(response.body())));
  }
}

代码示例来源:origin: pockethub/PocketHub

@Override
public void readNotification(@NonNull NotificationThread thread) {
  ServiceGenerator.createService(getActivity(), NotificationService.class)
      .markNotificationRead(thread.id())
      .subscribeOn(Schedulers.io())
      .observeOn(AndroidSchedulers.mainThread())
      .as(AutoDisposeUtils.bindToLifecycle(this))
      .subscribe(response -> refresh(), Throwable::printStackTrace);
}

代码示例来源:origin: pockethub/PocketHub

private void forkRepository() {
  ServiceGenerator.createService(this, RepositoryForkService.class)
      .createFork(repository.owner().login(), repository.name())
      .subscribeOn(Schedulers.io())
      .observeOn(AndroidSchedulers.mainThread())
      .as(AutoDisposeUtils.bindToLifecycle(this))
      .subscribe(response -> {
        Repository repo = response.body();
        UriLauncherActivity.launchUri(this, Uri.parse(repo.htmlUrl()));
      }, e -> ToastUtils.show(this, R.string.error_forking_repository));
}

代码示例来源:origin: pockethub/PocketHub

private void unstarGist() {
  ToastUtils.show(getActivity(), R.string.unstarring_gist);
  ServiceGenerator.createService(getActivity(), GistService.class)
      .unstarGist(gistId)
      .subscribeOn(Schedulers.io())
      .observeOn(AndroidSchedulers.mainThread())
      .as(AutoDisposeUtils.bindToLifecycle(this))
      .subscribe(response -> starred = !(response.code() == 204),
          e -> ToastUtils.show((Activity) getContext(), e.getMessage()));
}

代码示例来源:origin: pockethub/PocketHub

private void checkFollowingUserStatus() {
    followingStatusChecked = false;
    ServiceGenerator.createService(this, UserFollowerService.class)
        .isFollowing(user.login())
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .as(AutoDisposeUtils.bindToLifecycle(this))
        .subscribe(response -> {
          isFollowing = response.code() == 204;
          followingStatusChecked = true;
          invalidateOptionsMenu();
        });
  }
}

代码示例来源:origin: pockethub/PocketHub

private void starGist() {
  ToastUtils.show(getActivity(), R.string.starring_gist);
  ServiceGenerator.createService(getActivity(), GistService.class)
      .starGist(gistId)
      .subscribeOn(Schedulers.io())
      .observeOn(AndroidSchedulers.mainThread())
      .as(AutoDisposeUtils.bindToLifecycle(this))
      .subscribe(response -> starred = response.code() == 204,
          e -> ToastUtils.show((Activity) getContext(), e.getMessage()));
}

代码示例来源:origin: pockethub/PocketHub

@Override
public void readNotifications(@Nullable Repository repository) {
  ServiceGenerator.createService(getActivity(), NotificationService.class)
      .markAllRepositoryNotificationsRead(repository.owner().login(),
          repository.name(), NotificationReadRequest.builder().build())
      .subscribeOn(Schedulers.io())
      .observeOn(AndroidSchedulers.mainThread())
      .as(AutoDisposeUtils.bindToLifecycle(this))
      .subscribe(response -> refresh(), Throwable::printStackTrace);
}

代码示例来源:origin: pockethub/PocketHub

@Override
protected void createComment(String comment) {
  CommentRequest commentRequest = CommentRequest.builder()
      .body(comment)
      .build();
  ServiceGenerator.createService(this, IssueCommentService.class)
      .createIssueComment(repositoryId.owner().login(), repositoryId.name(), issueNumber, commentRequest)
      .subscribeOn(Schedulers.io())
      .observeOn(AndroidSchedulers.mainThread())
      .as(AutoDisposeUtils.bindToLifecycle(this))
      .subscribe(response -> finish(response.body()));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void dispose() {
  TestHelper.checkDisposed(Single.just(1).observeOn(Schedulers.single()));
}

代码示例来源:origin: pockethub/PocketHub

private void followUser() {
  UserFollowerService service = ServiceGenerator.createService(this, UserFollowerService.class);
  Single<Response<Void>> followSingle;
  if (isFollowing) {
    followSingle = service.unfollowUser(user.login());
  } else{
    followSingle = service.followUser(user.login());
  }
  followSingle.subscribeOn(Schedulers.io())
      .observeOn(AndroidSchedulers.mainThread())
      .as(AutoDisposeUtils.bindToLifecycle(this))
      .subscribe(aVoid -> isFollowing = !isFollowing,
          e -> ToastUtils.show(this, isFollowing ? R.string.error_unfollowing_person : R.string.error_following_person));
}

代码示例来源:origin: pockethub/PocketHub

private void checkStarredRepositoryStatus() {
  starredStatusChecked = false;
  ServiceGenerator.createService(this, StarringService.class)
      .checkIfRepositoryIsStarred(repository.owner().login(), repository.name())
      .subscribeOn(Schedulers.io())
      .observeOn(AndroidSchedulers.mainThread())
      .as(AutoDisposeUtils.bindToLifecycle(this))
      .subscribe(response -> {
        isStarred = response.code() == 204;
        starredStatusChecked = true;
        invalidateOptionsMenu();
      });
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test
public void noWinnerSuccessDispose() throws Exception {
  for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) {
    final AtomicBoolean interrupted = new AtomicBoolean();
    final CountDownLatch cdl = new CountDownLatch(1);
    Single.ambArray(
      Single.just(1)
        .subscribeOn(Schedulers.single())
        .observeOn(Schedulers.computation()),
      Single.never()
    )
    .subscribe(new BiConsumer<Object, Throwable>() {
      @Override
      public void accept(Object v, Throwable e) throws Exception {
        assertNotNull(v);
        assertNull(e);
        interrupted.set(Thread.currentThread().isInterrupted());
        cdl.countDown();
      }
    });
    assertTrue(cdl.await(500, TimeUnit.SECONDS));
    assertFalse("Interrupted!", interrupted.get());
  }
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
  @Test
  public void noWinnerErrorDispose() throws Exception {
    final TestException ex = new TestException();
    for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) {
      final AtomicBoolean interrupted = new AtomicBoolean();
      final CountDownLatch cdl = new CountDownLatch(1);

      Single.ambArray(
        Single.error(ex)
          .subscribeOn(Schedulers.single())
          .observeOn(Schedulers.computation()),
        Single.never()
      )
      .subscribe(new BiConsumer<Object, Throwable>() {
        @Override
        public void accept(Object v, Throwable e) throws Exception {
          assertNull(v);
          assertNotNull(e);
          interrupted.set(Thread.currentThread().isInterrupted());
          cdl.countDown();
        }
      });

      assertTrue(cdl.await(500, TimeUnit.SECONDS));
      assertFalse("Interrupted!", interrupted.get());
    }
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
  public void error() {
    Single.error(new TestException())
    .observeOn(Schedulers.single())
    .test()
    .awaitDone(5, TimeUnit.SECONDS)
    .assertFailure(TestException.class);
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testAsync() {
  TestSubscriber<String> ts = new TestSubscriber<String>();
  Single.just("Hello")
      .subscribeOn(Schedulers.io())
      .map(new Function<String, String>() {
        @Override
        public String apply(String v) {
          System.out.println("SubscribeOn Thread: " + Thread.currentThread());
          return v;
        }
      })
      .observeOn(Schedulers.computation())
      .map(new Function<String, String>() {
        @Override
        public String apply(String v) {
          System.out.println("ObserveOn Thread: " + Thread.currentThread());
          return v;
        }
      })
      .toFlowable().subscribe(ts);
  ts.awaitTerminalEvent();
  ts.assertValueSequence(Arrays.asList("Hello"));
}

相关文章

微信公众号

最新文章

更多