io.reactivex.schedulers.Schedulers.single()方法的使用及代码示例

x33g5p2x  于2022-01-29 转载在 其他  
字(8.2k)|赞(0)|评价(0)|浏览(142)

本文整理了Java中io.reactivex.schedulers.Schedulers.single()方法的一些代码示例,展示了Schedulers.single()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Schedulers.single()方法的具体详情如下:
包路径:io.reactivex.schedulers.Schedulers
类名称:Schedulers
方法名:single

Schedulers.single介绍

[英]Returns a default, shared, single-thread-backed Scheduler instance for work requiring strongly-sequential execution on the same background thread.

Uses:

  • event loop
  • support Schedulers.from(Executor) and from(ExecutorService) with delayed scheduling
  • support benchmarks that pipeline data from some thread to another thread and avoid core-bashing of computation's round-robin nature

Unhandled errors will be delivered to the scheduler Thread's java.lang.Thread.UncaughtExceptionHandler.

This type of scheduler is less sensitive to leaking io.reactivex.Scheduler.Worker instances, although not disposing a worker that has timed/delayed tasks not cancelled by other means may leak resources and/or execute those tasks "unexpectedly".

If the RxJavaPlugins#setFailOnNonBlockingScheduler(boolean) is set to true, attempting to execute operators that block while running on this scheduler will throw an IllegalStateException.

You can control certain properties of this standard scheduler via system properties that have to be set before the Schedulers class is referenced in your code.

Supported system properties ( System.getProperty()):

  • rx2.single-priority (int): sets the thread priority of the #single() Scheduler, default is Thread#NORM_PRIORITY

The default value of this scheduler can be overridden at initialization time via the RxJavaPlugins#setInitSingleSchedulerHandler(io.reactivex.functions.Function) plugin method. Note that due to possible initialization cycles, using any of the other scheduler-returning methods will result in a NullPointerException. Once the Schedulers class has been initialized, you can override the returned Scheduler instance via the RxJavaPlugins#setSingleSchedulerHandler(io.reactivex.functions.Function) method.

It is possible to create a fresh instance of this scheduler with a custom ThreadFactory, via the RxJavaPlugins#createSingleScheduler(ThreadFactory) method. Note that such custom instances require a manual call to Scheduler#shutdown() to allow the JVM to exit or the (J2EE) container to unload properly.

Operators on the base reactive classes that use this scheduler are marked with the @ io.reactivex.annotations.SchedulerSupport( io.reactivex.annotations.SchedulerSupport#SINGLE) annotation.
[中]对于需要在同一后台线程上执行强顺序的工作,返回一个默认的、共享的、单线程支持的调度程序实例。
使用:
*事件循环
*支持调度程序。来自(Executor)和来自(Executor Service)的延迟调度
*支持将数据从一个线程传输到另一个线程的基准测试,避免对计算循环性质的核心攻击
未经处理的错误将被传递到调度程序线程的java。朗。丝线。未捕获的异常处理程序。
这种类型的调度程序对泄漏的io不太敏感。reactivex。调度程序。Worker实例,尽管没有处理已通过其他方式取消的定时/延迟任务的Worker,但可能会泄漏资源和/或“意外”执行这些任务。
如果RxJavaPlugins#setfailionnonblockingscheduler(布尔值)设置为true,则在该调度器上运行时尝试执行阻塞的运算符将抛出非法状态异常。
您可以通过在代码中引用Schedulers类之前必须设置的系统属性来控制此标准计划程序的某些属性。
支持的系统属性(system.getProperty()):
*rx2。单优先级(int):设置#single()调度程序的线程优先级,默认为线程#NORM_优先级
该调度程序的默认值可以在初始化时通过RxJavaPlugins#setInitSingleSchedulerHandler(io.reactivex.functions.Function)插件方法重写。请注意,由于可能的初始化周期,使用任何其他调度程序返回方法都将导致NullPointerException。一旦Schedulers类被初始化,就可以通过RxJavaPlugins#setSingleSchedulerHandler(io.reactivex.functions.Function)方法重写返回的调度器实例。
可以通过RxJavaPlugins#createSingleScheduler(ThreadFactory)方法,使用定制的ThreadFactory创建此调度器的新实例。请注意,此类自定义实例需要手动调用Scheduler#shutdown()以允许JVM退出或(J2EE)容器正确卸载。
使用此调度程序的基本反应类上的运算符标记为@io。reactivex。注释。SchedulerSupport(io.reactivex.annotations.SchedulerSupport#SINGLE)注释。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Override
  public CompletableSource apply(Completable c) throws Exception {
    return c.observeOn(Schedulers.single());
  }
});

代码示例来源:origin: ReactiveX/RxJava

@Override
public void onSubscribe(final Disposable d) {
  Schedulers.single().scheduleDirect(new Runnable() {
    @Override
    public void run() {
      d.dispose();
    }
  }, 550, TimeUnit.MILLISECONDS);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void replaySelectorTimeBoundedUnitNull() {
  just1.replay(new Function<Observable<Integer>, Observable<Integer>>() {
    @Override
    public Observable<Integer> apply(Observable<Integer> v) {
      return v;
    }
  }, 1, null, Schedulers.single());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void overrideSingleScheduler() {
  try {
    RxJavaPlugins.setSingleSchedulerHandler(replaceWithImmediate);
    assertSame(ImmediateThinScheduler.INSTANCE, Schedulers.single());
  } finally {
    RxJavaPlugins.reset();
  }
  // make sure the reset worked
  assertNotSame(ImmediateThinScheduler.INSTANCE, Schedulers.single());
}

代码示例来源:origin: ReactiveX/RxJava

/**
 * Shuts down the standard Schedulers.
 * <p>The operation is idempotent and thread-safe.
 */
public static void shutdown() {
  computation().shutdown();
  io().shutdown();
  newThread().shutdown();
  single().shutdown();
  trampoline().shutdown();
  SchedulerPoolFactory.shutdown();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void emitLastTimedCustomScheduler() {
  Observable.just(1)
  .sample(1, TimeUnit.DAYS, Schedulers.single(), true)
  .test()
  .assertResult(1);
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test
public void restartTimer() {
  Observable.range(1, 5)
  .buffer(1, TimeUnit.DAYS, Schedulers.single(), 2, Functions.<Integer>createArrayList(16), true)
  .test()
  .assertResult(Arrays.asList(1, 2), Arrays.asList(3, 4), Arrays.asList(5));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void dispose() {
  TestHelper.checkDisposed(Single.just(1)
  .unsubscribeOn(Schedulers.single()));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void dispose() {
  TestHelper.checkDisposed(Maybe.just(1)
  .unsubscribeOn(Schedulers.single()));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void just() {
  Maybe.just(1)
  .unsubscribeOn(Schedulers.single())
  .test()
  .assertResult(1);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void dispose() {
  TestHelper.checkDisposed(Flowable.range(1, 5).buffer(1, TimeUnit.DAYS, Schedulers.single()));
  TestHelper.checkDisposed(Flowable.range(1, 5).buffer(2, 1, TimeUnit.DAYS, Schedulers.single()));
  TestHelper.checkDisposed(Flowable.range(1, 5).buffer(1, 2, TimeUnit.DAYS, Schedulers.single()));
  TestHelper.checkDisposed(Flowable.range(1, 5)
      .buffer(1, TimeUnit.DAYS, Schedulers.single(), 2, Functions.<Integer>createArrayList(16), true));
  TestHelper.checkDisposed(Flowable.range(1, 5).buffer(1));
  TestHelper.checkDisposed(Flowable.range(1, 5).buffer(2, 1));
  TestHelper.checkDisposed(Flowable.range(1, 5).buffer(1, 2));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void inputSyncFused() {
  Flowable.range(1, 5)
  .observeOn(Schedulers.single())
  .test()
  .awaitDone(5, TimeUnit.SECONDS)
  .assertResult(1, 2, 3, 4, 5);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void neverFallbackScheduler() {
  Maybe.never()
  .timeout(1, TimeUnit.MILLISECONDS, Schedulers.single(), Maybe.just(2))
  .test()
  .awaitDone(5, TimeUnit.SECONDS)
  .assertResult(2);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void restartTimer() {
  Flowable.range(1, 5)
  .window(1, TimeUnit.DAYS, Schedulers.single(), 2, true)
  .flatMap(Functions.<Flowable<Integer>>identity())
  .test()
  .assertResult(1, 2, 3, 4, 5);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
  public void error() {
    Single.error(new TestException())
    .observeOn(Schedulers.single())
    .test()
    .awaitDone(5, TimeUnit.SECONDS)
    .assertFailure(TestException.class);
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void inputAsyncFusedError() {
  UnicastSubject<Integer> us = UnicastSubject.create();
  TestObserver<Integer> to = us.observeOn(Schedulers.single()).test();
  us.onError(new TestException());
  to
  .awaitDone(5, TimeUnit.SECONDS)
  .assertFailure(TestException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void doubleObserveOn() {
  Flowable.just(1).hide()
  .observeOn(Schedulers.computation())
  .observeOn(Schedulers.single())
  .test()
  .awaitDone(5, TimeUnit.SECONDS)
  .assertResult(1);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void sizeAndTimeBoundReplayError() {
  ReplaySubject<Integer> rp = ReplaySubject.createWithTimeAndSize(1, TimeUnit.DAYS, Schedulers.single(), 2);
  rp.onNext(1);
  rp.onNext(2);
  rp.onNext(3);
  rp.onNext(4);
  rp.onError(new TestException());
  rp.test()
  .assertFailure(TestException.class, 3, 4);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void takeHalf() {
  int elements = 1024;
  Flowable.range(0, elements * 2).unsubscribeOn(Schedulers.single())
  .take(elements)
  .test()
  .awaitDone(5, TimeUnit.SECONDS)
  .assertValueCount(elements)
  .assertComplete()
  .assertNoErrors()
  .assertSubscribed();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void sizeAndTimeBoundReplayError() {
  ReplayProcessor<Integer> rp = ReplayProcessor.createWithTimeAndSize(1, TimeUnit.DAYS, Schedulers.single(), 2);
  rp.onNext(1);
  rp.onNext(2);
  rp.onNext(3);
  rp.onNext(4);
  rp.onError(new TestException());
  rp.test()
  .assertFailure(TestException.class, 3, 4);
}

相关文章