本文整理了Java中io.reactivex.Single.takeUntil()
方法的一些代码示例,展示了Single.takeUntil()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Single.takeUntil()
方法的具体详情如下:
包路径:io.reactivex.Single
类名称:Single
方法名:takeUntil
[英]Returns a Single that emits the item emitted by the source Single until a Completable terminates. Upon termination of other, this will emit a CancellationException rather than go to SingleObserver#onSuccess(Object).
Scheduler: takeUntil does not operate by default on a particular Scheduler.
[中]返回一个Single,该Single发出源Single发出的项,直到Completable终止。在终止other时,这将发出CancellationException,而不是转到SingleObserver#onSuccess(对象)。
调度程序:默认情况下,TakeTill不会在特定调度程序上运行。
代码示例来源:origin: ReactiveX/RxJava
/**
* Returns a Single that emits the item emitted by the source Single until a second Single emits an item. Upon
* emission of an item from {@code other}, this will emit a {@link CancellationException} rather than go to
* {@link SingleObserver#onSuccess(Object)}.
* <p>
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/takeUntil.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code takeUntil} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param other
* the Single whose emitted item will cause {@code takeUntil} to emit the item from the source Single
* @param <E>
* the type of item emitted by {@code other}
* @return a Single that emits the item emitted by the source Single until such time as {@code other} emits its item
* @see <a href="http://reactivex.io/documentation/operators/takeuntil.html">ReactiveX operators documentation: TakeUntil</a>
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public final <E> Single<T> takeUntil(final SingleSource<? extends E> other) {
ObjectHelper.requireNonNull(other, "other is null");
return takeUntil(new SingleToFlowable<E>(other));
}
代码示例来源:origin: ReactiveX/RxJava
/**
* Returns a Single that emits the item emitted by the source Single until a Completable terminates. Upon
* termination of {@code other}, this will emit a {@link CancellationException} rather than go to
* {@link SingleObserver#onSuccess(Object)}.
* <p>
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/takeUntil.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code takeUntil} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param other
* the Completable whose termination will cause {@code takeUntil} to emit the item from the source
* Single
* @return a Single that emits the item emitted by the source Single until such time as {@code other} terminates.
* @see <a href="http://reactivex.io/documentation/operators/takeuntil.html">ReactiveX operators documentation: TakeUntil</a>
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public final Single<T> takeUntil(final CompletableSource other) {
ObjectHelper.requireNonNull(other, "other is null");
return takeUntil(new CompletableToFlowable<T>(other));
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void withPublisherDispose() {
TestHelper.checkDisposed(Single.never().takeUntil(Flowable.never()));
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void flowableCancelDelayed() {
Single.never()
.takeUntil(new Flowable<Integer>() {
@Override
protected void subscribeActual(Subscriber<? super Integer> s) {
s.onSubscribe(new BooleanSubscription());
s.onNext(1);
s.onNext(2);
}
})
.test()
.assertFailure(CancellationException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void otherSignalsAndCompletes() {
List<Throwable> errors = TestHelper.trackPluginErrors();
try {
Single.just(1).takeUntil(Flowable.just(1).take(1))
.test()
.assertFailure(CancellationException.class);
assertTrue(errors.toString(), errors.isEmpty());
} finally {
RxJavaPlugins.reset();
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void otherOnNextPublisher() {
PublishProcessor<Integer> pp = PublishProcessor.create();
PublishProcessor<Integer> source = PublishProcessor.create();
TestObserver<Integer> to = source.single(-99).takeUntil(pp)
.test();
pp.onNext(1);
to.assertFailure(CancellationException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void otherOnCompletePublisher() {
PublishProcessor<Integer> pp = PublishProcessor.create();
PublishProcessor<Integer> source = PublishProcessor.create();
TestObserver<Integer> to = source.single(-99).takeUntil(pp)
.test();
pp.onComplete();
to.assertFailure(CancellationException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void otherOnCompleteCompletable() {
PublishProcessor<Integer> pp = PublishProcessor.create();
PublishProcessor<Integer> source = PublishProcessor.create();
TestObserver<Integer> to = source.single(-99).takeUntil(pp.ignoreElements())
.test();
pp.onComplete();
to.assertFailure(CancellationException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void mainSuccessPublisher() {
PublishProcessor<Integer> pp = PublishProcessor.create();
PublishProcessor<Integer> source = PublishProcessor.create();
TestObserver<Integer> to = source.single(-99).takeUntil(pp)
.test();
source.onNext(1);
source.onComplete();
to.assertResult(1);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void mainErrorPublisher() {
PublishProcessor<Integer> pp = PublishProcessor.create();
PublishProcessor<Integer> source = PublishProcessor.create();
TestObserver<Integer> to = source.single(-99).takeUntil(pp)
.test();
source.onError(new TestException());
to.assertFailure(TestException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void otherErrorPublisher() {
PublishProcessor<Integer> pp = PublishProcessor.create();
PublishProcessor<Integer> source = PublishProcessor.create();
TestObserver<Integer> to = source.single(-99).takeUntil(pp)
.test();
pp.onError(new TestException());
to.assertFailure(TestException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void mainSuccessCompletable() {
PublishProcessor<Integer> pp = PublishProcessor.create();
PublishProcessor<Integer> source = PublishProcessor.create();
TestObserver<Integer> to = source.single(-99).takeUntil(pp.ignoreElements())
.test();
source.onNext(1);
source.onComplete();
to.assertResult(1);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void otherErrorSingle() {
PublishProcessor<Integer> pp = PublishProcessor.create();
PublishProcessor<Integer> source = PublishProcessor.create();
TestObserver<Integer> to = source.single(-99).takeUntil(pp.single(-99))
.test();
pp.onError(new TestException());
to.assertFailure(TestException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void mainErrorSingle() {
PublishProcessor<Integer> pp = PublishProcessor.create();
PublishProcessor<Integer> source = PublishProcessor.create();
TestObserver<Integer> to = source.single(-99).takeUntil(pp.single(-99))
.test();
source.onError(new TestException());
to.assertFailure(TestException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void otherOnNextSingle() {
PublishProcessor<Integer> pp = PublishProcessor.create();
PublishProcessor<Integer> source = PublishProcessor.create();
TestObserver<Integer> to = source.single(-99).takeUntil(pp.single(-99))
.test();
pp.onNext(1);
pp.onComplete();
to.assertFailure(CancellationException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void otherErrorCompletable() {
PublishProcessor<Integer> pp = PublishProcessor.create();
PublishProcessor<Integer> source = PublishProcessor.create();
TestObserver<Integer> to = source.single(-99).takeUntil(pp.ignoreElements())
.test();
pp.onError(new TestException());
to.assertFailure(TestException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void mainSuccessSingle() {
PublishProcessor<Integer> pp = PublishProcessor.create();
PublishProcessor<Integer> source = PublishProcessor.create();
TestObserver<Integer> to = source.single(-99).takeUntil(pp.single(-99))
.test();
source.onNext(1);
source.onComplete();
to.assertResult(1);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void mainErrorCompletable() {
PublishProcessor<Integer> pp = PublishProcessor.create();
PublishProcessor<Integer> source = PublishProcessor.create();
TestObserver<Integer> to = source.single(-99).takeUntil(pp.ignoreElements())
.test();
source.onError(new TestException());
to.assertFailure(TestException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void otherOnNextCompletable() {
PublishProcessor<Integer> pp = PublishProcessor.create();
PublishProcessor<Integer> source = PublishProcessor.create();
TestObserver<Integer> to = source.single(-99).takeUntil(pp.ignoreElements())
.test();
pp.onNext(1);
pp.onComplete();
to.assertFailure(CancellationException.class);
}
代码示例来源:origin: ReactiveX/RxJava
final PublishProcessor<Integer> pp2 = PublishProcessor.create();
TestObserver<Integer> to = pp1.singleOrError().takeUntil(pp2).test();
内容来源于网络,如有侵权,请联系作者删除!