io.reactivex.Single.takeUntil()方法的使用及代码示例

x33g5p2x  于2022-01-29 转载在 其他  
字(8.7k)|赞(0)|评价(0)|浏览(111)

本文整理了Java中io.reactivex.Single.takeUntil()方法的一些代码示例,展示了Single.takeUntil()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Single.takeUntil()方法的具体详情如下:
包路径:io.reactivex.Single
类名称:Single
方法名:takeUntil

Single.takeUntil介绍

[英]Returns a Single that emits the item emitted by the source Single until a Completable terminates. Upon termination of other, this will emit a CancellationException rather than go to SingleObserver#onSuccess(Object).

Scheduler: takeUntil does not operate by default on a particular Scheduler.
[中]返回一个Single,该Single发出源Single发出的项,直到Completable终止。在终止other时,这将发出CancellationException,而不是转到SingleObserver#onSuccess(对象)。
调度程序:默认情况下,TakeTill不会在特定调度程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

/**
 * Returns a Single that emits the item emitted by the source Single until a second Single emits an item. Upon
 * emission of an item from {@code other}, this will emit a {@link CancellationException} rather than go to
 * {@link SingleObserver#onSuccess(Object)}.
 * <p>
 * <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/takeUntil.png" alt="">
 * <dl>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>{@code takeUntil} does not operate by default on a particular {@link Scheduler}.</dd>
 * </dl>
 *
 * @param other
 *            the Single whose emitted item will cause {@code takeUntil} to emit the item from the source Single
 * @param <E>
 *            the type of item emitted by {@code other}
 * @return a Single that emits the item emitted by the source Single until such time as {@code other} emits its item
 * @see <a href="http://reactivex.io/documentation/operators/takeuntil.html">ReactiveX operators documentation: TakeUntil</a>
 */
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public final <E> Single<T> takeUntil(final SingleSource<? extends E> other) {
  ObjectHelper.requireNonNull(other, "other is null");
  return takeUntil(new SingleToFlowable<E>(other));
}

代码示例来源:origin: ReactiveX/RxJava

/**
 * Returns a Single that emits the item emitted by the source Single until a Completable terminates. Upon
 * termination of {@code other}, this will emit a {@link CancellationException} rather than go to
 * {@link SingleObserver#onSuccess(Object)}.
 * <p>
 * <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/takeUntil.png" alt="">
 * <dl>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>{@code takeUntil} does not operate by default on a particular {@link Scheduler}.</dd>
 * </dl>
 *
 * @param other
 *            the Completable whose termination will cause {@code takeUntil} to emit the item from the source
 *            Single
 * @return a Single that emits the item emitted by the source Single until such time as {@code other} terminates.
 * @see <a href="http://reactivex.io/documentation/operators/takeuntil.html">ReactiveX operators documentation: TakeUntil</a>
 */
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public final Single<T> takeUntil(final CompletableSource other) {
  ObjectHelper.requireNonNull(other, "other is null");
  return takeUntil(new CompletableToFlowable<T>(other));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void withPublisherDispose() {
  TestHelper.checkDisposed(Single.never().takeUntil(Flowable.never()));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void flowableCancelDelayed() {
  Single.never()
  .takeUntil(new Flowable<Integer>() {
    @Override
    protected void subscribeActual(Subscriber<? super Integer> s) {
      s.onSubscribe(new BooleanSubscription());
      s.onNext(1);
      s.onNext(2);
    }
  })
  .test()
  .assertFailure(CancellationException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void otherSignalsAndCompletes() {
  List<Throwable> errors = TestHelper.trackPluginErrors();
  try {
    Single.just(1).takeUntil(Flowable.just(1).take(1))
    .test()
    .assertFailure(CancellationException.class);
    assertTrue(errors.toString(), errors.isEmpty());
  } finally {
    RxJavaPlugins.reset();
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void otherOnNextPublisher() {
  PublishProcessor<Integer> pp = PublishProcessor.create();
  PublishProcessor<Integer> source = PublishProcessor.create();
  TestObserver<Integer> to = source.single(-99).takeUntil(pp)
  .test();
  pp.onNext(1);
  to.assertFailure(CancellationException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void otherOnCompletePublisher() {
  PublishProcessor<Integer> pp = PublishProcessor.create();
  PublishProcessor<Integer> source = PublishProcessor.create();
  TestObserver<Integer> to = source.single(-99).takeUntil(pp)
  .test();
  pp.onComplete();
  to.assertFailure(CancellationException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void otherOnCompleteCompletable() {
  PublishProcessor<Integer> pp = PublishProcessor.create();
  PublishProcessor<Integer> source = PublishProcessor.create();
  TestObserver<Integer> to = source.single(-99).takeUntil(pp.ignoreElements())
  .test();
  pp.onComplete();
  to.assertFailure(CancellationException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void mainSuccessPublisher() {
  PublishProcessor<Integer> pp = PublishProcessor.create();
  PublishProcessor<Integer> source = PublishProcessor.create();
  TestObserver<Integer> to = source.single(-99).takeUntil(pp)
  .test();
  source.onNext(1);
  source.onComplete();
  to.assertResult(1);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void mainErrorPublisher() {
  PublishProcessor<Integer> pp = PublishProcessor.create();
  PublishProcessor<Integer> source = PublishProcessor.create();
  TestObserver<Integer> to = source.single(-99).takeUntil(pp)
  .test();
  source.onError(new TestException());
  to.assertFailure(TestException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void otherErrorPublisher() {
  PublishProcessor<Integer> pp = PublishProcessor.create();
  PublishProcessor<Integer> source = PublishProcessor.create();
  TestObserver<Integer> to = source.single(-99).takeUntil(pp)
  .test();
  pp.onError(new TestException());
  to.assertFailure(TestException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void mainSuccessCompletable() {
  PublishProcessor<Integer> pp = PublishProcessor.create();
  PublishProcessor<Integer> source = PublishProcessor.create();
  TestObserver<Integer> to = source.single(-99).takeUntil(pp.ignoreElements())
  .test();
  source.onNext(1);
  source.onComplete();
  to.assertResult(1);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void otherErrorSingle() {
  PublishProcessor<Integer> pp = PublishProcessor.create();
  PublishProcessor<Integer> source = PublishProcessor.create();
  TestObserver<Integer> to = source.single(-99).takeUntil(pp.single(-99))
  .test();
  pp.onError(new TestException());
  to.assertFailure(TestException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void mainErrorSingle() {
  PublishProcessor<Integer> pp = PublishProcessor.create();
  PublishProcessor<Integer> source = PublishProcessor.create();
  TestObserver<Integer> to = source.single(-99).takeUntil(pp.single(-99))
  .test();
  source.onError(new TestException());
  to.assertFailure(TestException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void otherOnNextSingle() {
  PublishProcessor<Integer> pp = PublishProcessor.create();
  PublishProcessor<Integer> source = PublishProcessor.create();
  TestObserver<Integer> to = source.single(-99).takeUntil(pp.single(-99))
  .test();
  pp.onNext(1);
  pp.onComplete();
  to.assertFailure(CancellationException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void otherErrorCompletable() {
  PublishProcessor<Integer> pp = PublishProcessor.create();
  PublishProcessor<Integer> source = PublishProcessor.create();
  TestObserver<Integer> to = source.single(-99).takeUntil(pp.ignoreElements())
  .test();
  pp.onError(new TestException());
  to.assertFailure(TestException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void mainSuccessSingle() {
  PublishProcessor<Integer> pp = PublishProcessor.create();
  PublishProcessor<Integer> source = PublishProcessor.create();
  TestObserver<Integer> to = source.single(-99).takeUntil(pp.single(-99))
  .test();
  source.onNext(1);
  source.onComplete();
  to.assertResult(1);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void mainErrorCompletable() {
  PublishProcessor<Integer> pp = PublishProcessor.create();
  PublishProcessor<Integer> source = PublishProcessor.create();
  TestObserver<Integer> to = source.single(-99).takeUntil(pp.ignoreElements())
  .test();
  source.onError(new TestException());
  to.assertFailure(TestException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void otherOnNextCompletable() {
  PublishProcessor<Integer> pp = PublishProcessor.create();
  PublishProcessor<Integer> source = PublishProcessor.create();
  TestObserver<Integer> to = source.single(-99).takeUntil(pp.ignoreElements())
  .test();
  pp.onNext(1);
  pp.onComplete();
  to.assertFailure(CancellationException.class);
}

代码示例来源:origin: ReactiveX/RxJava

final PublishProcessor<Integer> pp2 = PublishProcessor.create();
TestObserver<Integer> to = pp1.singleOrError().takeUntil(pp2).test();

相关文章

微信公众号

最新文章

更多