io.reactivex.schedulers.Schedulers.newThread()方法的使用及代码示例

x33g5p2x  于2022-01-29 转载在 其他  
字(9.1k)|赞(0)|评价(0)|浏览(101)

本文整理了Java中io.reactivex.schedulers.Schedulers.newThread()方法的一些代码示例,展示了Schedulers.newThread()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Schedulers.newThread()方法的具体详情如下:
包路径:io.reactivex.schedulers.Schedulers
类名称:Schedulers
方法名:newThread

Schedulers.newThread介绍

[英]Returns a default, shared Scheduler instance that creates a new Thread for each unit of work.

The default implementation of this scheduler creates a new, single-threaded ScheduledExecutorService for each invocation of the Scheduler#scheduleDirect(Runnable) (plus its overloads) and Scheduler#createWorker()methods, thus an unbounded number of worker threads may be created that can result in system slowdowns or OutOfMemoryError. Therefore, for casual uses or when implementing an operator, the Worker instances must be disposed via io.reactivex.Scheduler.Worker#dispose().

Unhandled errors will be delivered to the scheduler Thread's java.lang.Thread.UncaughtExceptionHandler.

You can control certain properties of this standard scheduler via system properties that have to be set before the Schedulers class is referenced in your code.

Supported system properties ( System.getProperty()):

  • rx2.newthread-priority (int): sets the thread priority of the #newThread() Scheduler, default is Thread#NORM_PRIORITY

The default value of this scheduler can be overridden at initialization time via the RxJavaPlugins#setInitNewThreadSchedulerHandler(io.reactivex.functions.Function) plugin method. Note that due to possible initialization cycles, using any of the other scheduler-returning methods will result in a NullPointerException. Once the Schedulers class has been initialized, you can override the returned Scheduler instance via the RxJavaPlugins#setNewThreadSchedulerHandler(io.reactivex.functions.Function) method.

It is possible to create a fresh instance of this scheduler with a custom ThreadFactory, via the RxJavaPlugins#createNewThreadScheduler(ThreadFactory) method. Note that such custom instances require a manual call to Scheduler#shutdown() to allow the JVM to exit or the (J2EE) container to unload properly.

Operators on the base reactive classes that use this scheduler are marked with the @ io.reactivex.annotations.SchedulerSupport( io.reactivex.annotations.SchedulerSupport#NEW_THREAD) annotation.
[中]返回一个默认的共享调度程序实例,该实例为每个工作单元创建一个新线程。
此调度器的默认实现为调度器#scheduleDirect(Runnable)(加上其重载)和调度器#createWorker()方法的每次调用创建一个新的单线程ScheduleExecutorService,因此可能会创建数量不限的工作线程,这可能会导致系统速度减慢或OutOfMemoryError。因此,对于临时使用或实现运算符时,必须通过io来处理工作实例。reactivex。调度程序。工人#处置()。
未经处理的错误将被传递到调度程序线程的java。朗。丝线。未捕获的异常处理程序。
您可以通过在代码中引用Schedulers类之前必须设置的系统属性来控制此标准计划程序的某些属性。
支持的系统属性(system.getProperty()):
*rx2。newthread priority(int):设置#newthread()调度程序的线程优先级,默认为线程#NORM_priority
在初始化时,可以通过RxJavaPlugins#setInitNewThreadSchedulerHandler(io.reactivex.functions.Function)插件方法覆盖此调度程序的默认值。请注意,由于可能的初始化周期,使用任何其他调度程序返回方法都将导致NullPointerException。一旦Schedulers类被初始化,就可以通过RxJavaPlugins#setNewThreadSchedulerHandler(io.reactivex.functions.Function)方法重写返回的调度器实例。
通过RxJavaPlugins#createNewThreadScheduler(ThreadFactory)方法,可以使用自定义ThreadFactory创建此调度程序的新实例。请注意,此类自定义实例需要手动调用Scheduler#shutdown()以允许JVM退出或(J2EE)容器正确卸载。
使用此调度程序的基本反应类上的运算符标记为@io。reactivex。注释。SchedulerSupport(io.reactivex.annotations.SchedulerSupport#NEW_THREAD)注释。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Override
public Observable<String> apply(final GroupedObservable<Integer, Integer> group) {
  return group.subscribeOn(Schedulers.newThread()).map(new Function<Integer, String>() {
    @Override
    public String apply(Integer t1) {
      System.out.println("Received: " + t1 + " on group : " + group.getKey());
      return "first groups: " + t1;
    }
  });
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testAsyncChild() {
  TestObserver<Integer> to = new TestObserver<Integer>();
  Observable.range(0, 100000).observeOn(Schedulers.newThread()).observeOn(Schedulers.newThread()).subscribe(to);
  to.awaitTerminalEvent();
  to.assertNoErrors();
}

代码示例来源:origin: ReactiveX/RxJava

@Override
public Flowable<String> apply(final GroupedFlowable<Integer, Integer> group) {
  return group.subscribeOn(Schedulers.newThread()).map(new Function<Integer, String>() {
    @Override
    public String apply(Integer t1) {
      System.out.println("Received: " + t1 + " on group : " + group.getKey());
      return "first groups: " + t1;
    }
  });
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void overrideNewThreadScheduler() {
  try {
    RxJavaPlugins.setNewThreadSchedulerHandler(replaceWithImmediate);
    assertSame(ImmediateThinScheduler.INSTANCE, Schedulers.newThread());
  } finally {
    RxJavaPlugins.reset();
  }
  // make sure the reset worked
  assertNotSame(ImmediateThinScheduler.INSTANCE, Schedulers.newThread());
}

代码示例来源:origin: ReactiveX/RxJava

/**
 * Shuts down the standard Schedulers.
 * <p>The operation is idempotent and thread-safe.
 */
public static void shutdown() {
  computation().shutdown();
  io().shutdown();
  newThread().shutdown();
  single().shutdown();
  trampoline().shutdown();
  SchedulerPoolFactory.shutdown();
}

代码示例来源:origin: ReactiveX/RxJava

@Test(timeout = 20000)
public void testNoStackOverFlow() {
  Observable.just(1).repeat().subscribeOn(Schedulers.newThread()).take(100000).blockingLast();
}

代码示例来源:origin: ReactiveX/RxJava

/**
 * Starts the standard Schedulers.
 * <p>The operation is idempotent and thread-safe.
 */
public static void start() {
  computation().start();
  io().start();
  newThread().start();
  single().start();
  trampoline().start();
  SchedulerPoolFactory.start();
}

代码示例来源:origin: ReactiveX/RxJava

@Test(timeout = 2000)
public void testRepeatTake() {
  Observable<Integer> xs = Observable.just(1, 2);
  Object[] ys = xs.repeat().subscribeOn(Schedulers.newThread()).take(4).toList().blockingGet().toArray();
  assertArrayEquals(new Object[] { 1, 2, 1, 2 }, ys);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(timeout = 1500)
public void timerNewThread() {
  Completable c = Completable.timer(500, TimeUnit.MILLISECONDS, Schedulers.newThread());
  c.blockingAwait();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void onScheduleNewThread() throws InterruptedException {
  onSchedule(Schedulers.newThread().createWorker());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testAsynchronousRun() {
  Observable.range(1, 2).concatMapEager(new Function<Integer, Observable<Integer>>() {
    @Override
    public Observable<Integer> apply(Integer t) {
      return Observable.range(1, 1000).subscribeOn(Schedulers.computation());
    }
  }).observeOn(Schedulers.newThread()).subscribe(to);
  to.awaitTerminalEvent(5, TimeUnit.SECONDS);
  to.assertNoErrors();
  to.assertValueCount(2000);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(timeout = 2000)
public void testTakeObserveOn() {
  Subscriber<Object> subscriber = TestHelper.mockSubscriber();
  TestSubscriber<Object> ts = new TestSubscriber<Object>(subscriber);
  INFINITE_OBSERVABLE.onBackpressureDrop()
  .observeOn(Schedulers.newThread()).take(1).subscribe(ts);
  ts.awaitTerminalEvent();
  ts.assertNoErrors();
  verify(subscriber).onNext(1L);
  verify(subscriber, never()).onNext(2L);
  verify(subscriber).onComplete();
  verify(subscriber, never()).onError(any(Throwable.class));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testBackpressure1() {
  TestObserver<Integer> to = new TestObserver<Integer>();
  Observable.range(1, 100000).takeLast(1)
  .observeOn(Schedulers.newThread())
  .map(newSlowProcessor()).subscribe(to);
  to.awaitTerminalEvent();
  to.assertNoErrors();
  to.assertValue(100000);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testAsyncChild() {
  TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
  Flowable.range(0, 100000).observeOn(Schedulers.newThread()).observeOn(Schedulers.newThread()).subscribe(ts);
  ts.awaitTerminalEvent();
  ts.assertNoErrors();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testBackpressure2() {
  TestObserver<Integer> to = new TestObserver<Integer>();
  Observable.range(1, 100000).takeLast(Flowable.bufferSize() * 4)
  .observeOn(Schedulers.newThread()).map(newSlowProcessor()).subscribe(to);
  to.awaitTerminalEvent();
  to.assertNoErrors();
  assertEquals(Flowable.bufferSize() * 4, to.valueCount());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testDefaultNewThreadSchedulerIsInitializedLazily() {
  // unsafe default Scheduler Callable should not be evaluated
  try {
    RxJavaPlugins.setInitNewThreadSchedulerHandler(initReplaceWithImmediate);
    RxJavaPlugins.initNewThreadScheduler(unsafeDefault);
  } finally {
    RxJavaPlugins.reset();
  }
  // make sure the reset worked
  assertNotSame(ImmediateThinScheduler.INSTANCE, Schedulers.newThread());
}

代码示例来源:origin: ReactiveX/RxJava

@Test(timeout = 2000)
public void testRepeatTake() {
  Flowable<Integer> xs = Flowable.just(1, 2);
  Object[] ys = xs.repeat().subscribeOn(Schedulers.newThread()).take(4).toList().blockingGet().toArray();
  assertArrayEquals(new Object[] { 1, 2, 1, 2 }, ys);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(timeout = 20000)
public void testNoStackOverFlow() {
  Flowable.just(1).repeat().subscribeOn(Schedulers.newThread()).take(100000).blockingLast();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testBackpressure1() {
  TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
  Flowable.range(1, 100000).takeLast(1)
  .observeOn(Schedulers.newThread())
  .map(newSlowProcessor()).subscribe(ts);
  ts.awaitTerminalEvent();
  ts.assertNoErrors();
  ts.assertValue(100000);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testBackpressure2() {
  TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
  Flowable.range(1, 100000).takeLast(Flowable.bufferSize() * 4)
  .observeOn(Schedulers.newThread()).map(newSlowProcessor()).subscribe(ts);
  ts.awaitTerminalEvent();
  ts.assertNoErrors();
  assertEquals(Flowable.bufferSize() * 4, ts.valueCount());
}

相关文章