本文整理了Java中io.reactivex.Single.observeOn()
方法的一些代码示例,展示了Single.observeOn()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Single.observeOn()
方法的具体详情如下:
包路径:io.reactivex.Single
类名称:Single
方法名:observeOn
[英]Modifies a Single to emit its item (or notify of its error) on a specified Scheduler, asynchronously.
Scheduler: you specify which Scheduler this operator will use.
[中]在指定的计划程序上异步修改单个项以发出其项(或通知其错误)。
调度器:指定该操作员将使用的调度器。
代码示例来源:origin: ReactiveX/RxJava
@Override
public SingleSource<Object> apply(Single<Object> s) throws Exception {
return s.observeOn(Schedulers.single());
}
});
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void observeOnNull() {
just1.observeOn(null);
}
代码示例来源:origin: pockethub/PocketHub
private void loadMarkdown(String raw, Repository repo) {
MarkdownLoader.load(getActivity(), raw, repo, imageGetter, true)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(rendered -> {
bodyText.setText(rendered);
showLoading(false);
} , e -> ToastUtils.show(getActivity(), R.string.error_rendering_markdown));
}
}
代码示例来源:origin: pockethub/PocketHub
private Observable<Page<NotificationThread>> getPageAndNext(int i) {
return ServiceGenerator.createService(getActivity(), NotificationService.class)
.getNotifications(filters, i)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.flatMapObservable(response -> {
Page<NotificationThread> page = response.body();
if (page.next() == null) {
return Observable.just(page);
}
return Observable.just(page).concatWith(getPageAndNext(page.next()));
});
}
代码示例来源:origin: pockethub/PocketHub
@Override
public void onItemClick(@NonNull Item item, @NonNull View view) {
if (item instanceof UserItem) {
User result = ((UserItem) item).getUser();
ServiceGenerator.createService(getContext(), UserService.class)
.getUser(result.login())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.as(AutoDisposeUtils.bindToLifecycle(this))
.subscribe(response ->
startActivity(UserViewActivity.createIntent(response.body())));
}
}
代码示例来源:origin: pockethub/PocketHub
@Override
public void onItemClick(@NonNull Item item, @NonNull View view) {
if (item instanceof ContributorItem) {
User contributor = ((ContributorItem) item).getUser();
ServiceGenerator.createService(getContext(), UserService.class)
.getUser(contributor.login())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.as(AutoDisposeUtils.bindToLifecycle(this))
.subscribe(response ->
startActivity(UserViewActivity.createIntent(response.body())));
}
}
代码示例来源:origin: pockethub/PocketHub
@Override
public void readNotification(@NonNull NotificationThread thread) {
ServiceGenerator.createService(getActivity(), NotificationService.class)
.markNotificationRead(thread.id())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.as(AutoDisposeUtils.bindToLifecycle(this))
.subscribe(response -> refresh(), Throwable::printStackTrace);
}
代码示例来源:origin: pockethub/PocketHub
private void forkRepository() {
ServiceGenerator.createService(this, RepositoryForkService.class)
.createFork(repository.owner().login(), repository.name())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.as(AutoDisposeUtils.bindToLifecycle(this))
.subscribe(response -> {
Repository repo = response.body();
UriLauncherActivity.launchUri(this, Uri.parse(repo.htmlUrl()));
}, e -> ToastUtils.show(this, R.string.error_forking_repository));
}
代码示例来源:origin: pockethub/PocketHub
private void unstarGist() {
ToastUtils.show(getActivity(), R.string.unstarring_gist);
ServiceGenerator.createService(getActivity(), GistService.class)
.unstarGist(gistId)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.as(AutoDisposeUtils.bindToLifecycle(this))
.subscribe(response -> starred = !(response.code() == 204),
e -> ToastUtils.show((Activity) getContext(), e.getMessage()));
}
代码示例来源:origin: pockethub/PocketHub
private void checkFollowingUserStatus() {
followingStatusChecked = false;
ServiceGenerator.createService(this, UserFollowerService.class)
.isFollowing(user.login())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.as(AutoDisposeUtils.bindToLifecycle(this))
.subscribe(response -> {
isFollowing = response.code() == 204;
followingStatusChecked = true;
invalidateOptionsMenu();
});
}
}
代码示例来源:origin: pockethub/PocketHub
private void starGist() {
ToastUtils.show(getActivity(), R.string.starring_gist);
ServiceGenerator.createService(getActivity(), GistService.class)
.starGist(gistId)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.as(AutoDisposeUtils.bindToLifecycle(this))
.subscribe(response -> starred = response.code() == 204,
e -> ToastUtils.show((Activity) getContext(), e.getMessage()));
}
代码示例来源:origin: pockethub/PocketHub
@Override
public void readNotifications(@Nullable Repository repository) {
ServiceGenerator.createService(getActivity(), NotificationService.class)
.markAllRepositoryNotificationsRead(repository.owner().login(),
repository.name(), NotificationReadRequest.builder().build())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.as(AutoDisposeUtils.bindToLifecycle(this))
.subscribe(response -> refresh(), Throwable::printStackTrace);
}
代码示例来源:origin: pockethub/PocketHub
@Override
protected void createComment(String comment) {
CommentRequest commentRequest = CommentRequest.builder()
.body(comment)
.build();
ServiceGenerator.createService(this, IssueCommentService.class)
.createIssueComment(repositoryId.owner().login(), repositoryId.name(), issueNumber, commentRequest)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.as(AutoDisposeUtils.bindToLifecycle(this))
.subscribe(response -> finish(response.body()));
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void dispose() {
TestHelper.checkDisposed(Single.just(1).observeOn(Schedulers.single()));
}
代码示例来源:origin: pockethub/PocketHub
private void followUser() {
UserFollowerService service = ServiceGenerator.createService(this, UserFollowerService.class);
Single<Response<Void>> followSingle;
if (isFollowing) {
followSingle = service.unfollowUser(user.login());
} else{
followSingle = service.followUser(user.login());
}
followSingle.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.as(AutoDisposeUtils.bindToLifecycle(this))
.subscribe(aVoid -> isFollowing = !isFollowing,
e -> ToastUtils.show(this, isFollowing ? R.string.error_unfollowing_person : R.string.error_following_person));
}
代码示例来源:origin: pockethub/PocketHub
private void checkStarredRepositoryStatus() {
starredStatusChecked = false;
ServiceGenerator.createService(this, StarringService.class)
.checkIfRepositoryIsStarred(repository.owner().login(), repository.name())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.as(AutoDisposeUtils.bindToLifecycle(this))
.subscribe(response -> {
isStarred = response.code() == 204;
starredStatusChecked = true;
invalidateOptionsMenu();
});
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void noWinnerSuccessDispose() throws Exception {
for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) {
final AtomicBoolean interrupted = new AtomicBoolean();
final CountDownLatch cdl = new CountDownLatch(1);
Single.ambArray(
Single.just(1)
.subscribeOn(Schedulers.single())
.observeOn(Schedulers.computation()),
Single.never()
)
.subscribe(new BiConsumer<Object, Throwable>() {
@Override
public void accept(Object v, Throwable e) throws Exception {
assertNotNull(v);
assertNull(e);
interrupted.set(Thread.currentThread().isInterrupted());
cdl.countDown();
}
});
assertTrue(cdl.await(500, TimeUnit.SECONDS));
assertFalse("Interrupted!", interrupted.get());
}
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void noWinnerErrorDispose() throws Exception {
final TestException ex = new TestException();
for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) {
final AtomicBoolean interrupted = new AtomicBoolean();
final CountDownLatch cdl = new CountDownLatch(1);
Single.ambArray(
Single.error(ex)
.subscribeOn(Schedulers.single())
.observeOn(Schedulers.computation()),
Single.never()
)
.subscribe(new BiConsumer<Object, Throwable>() {
@Override
public void accept(Object v, Throwable e) throws Exception {
assertNull(v);
assertNotNull(e);
interrupted.set(Thread.currentThread().isInterrupted());
cdl.countDown();
}
});
assertTrue(cdl.await(500, TimeUnit.SECONDS));
assertFalse("Interrupted!", interrupted.get());
}
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void error() {
Single.error(new TestException())
.observeOn(Schedulers.single())
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertFailure(TestException.class);
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testAsync() {
TestSubscriber<String> ts = new TestSubscriber<String>();
Single.just("Hello")
.subscribeOn(Schedulers.io())
.map(new Function<String, String>() {
@Override
public String apply(String v) {
System.out.println("SubscribeOn Thread: " + Thread.currentThread());
return v;
}
})
.observeOn(Schedulers.computation())
.map(new Function<String, String>() {
@Override
public String apply(String v) {
System.out.println("ObserveOn Thread: " + Thread.currentThread());
return v;
}
})
.toFlowable().subscribe(ts);
ts.awaitTerminalEvent();
ts.assertValueSequence(Arrays.asList("Hello"));
}
内容来源于网络,如有侵权,请联系作者删除!