io.reactivex.schedulers.Schedulers类的使用及代码示例

x33g5p2x  于2022-01-29 转载在 其他  
字(6.5k)|赞(0)|评价(0)|浏览(301)

本文整理了Java中io.reactivex.schedulers.Schedulers类的一些代码示例,展示了Schedulers类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Schedulers类的具体详情如下:
包路径:io.reactivex.schedulers.Schedulers
类名称:Schedulers

Schedulers介绍

[英]Static factory methods for returning standard Scheduler instances.

The initial and runtime values of the various scheduler types can be overridden via the RxJavaPlugins.setInit(scheduler name)SchedulerHandler() and RxJavaPlugins.set(scheduler name)SchedulerHandler() respectively.

Supported system properties ( System.getProperty()):

  • rx2.io-keep-alive-time (long): sets the keep-alive time of the #io() Scheduler workers, default is IoScheduler#KEEP_ALIVE_TIME_DEFAULT
  • rx2.io-priority (int): sets the thread priority of the #io() Scheduler, default is Thread#NORM_PRIORITY
  • rx2.computation-threads (int): sets the number of threads in the #computation() Scheduler, default is the number of available CPUs
  • rx2.computation-priority (int): sets the thread priority of the #computation() Scheduler, default is Thread#NORM_PRIORITY
  • rx2.newthread-priority (int): sets the thread priority of the #newThread() Scheduler, default is Thread#NORM_PRIORITY
  • rx2.single-priority (int): sets the thread priority of the #single() Scheduler, default is Thread#NORM_PRIORITY
  • rx2.purge-enabled (boolean): enables periodic purging of all Scheduler's backing thread pools, default is false
  • rx2.purge-period-seconds (int): specifies the periodic purge interval of all Scheduler's backing thread pools, default is 1 second
    [中]用于返回标准调度程序实例的静态工厂方法。
    各种调度器类型的初始值和运行时值可以通过RxJavaPlugins重写。setInit(调度器名称)SchedulerHandler()和RxJavaPlugins。分别设置(调度程序名称)SchedulerHandler()。
    支持的系统属性(system.getProperty()):
    *rx2。io保持活动时间(长):设置#io()调度程序工作程序的保持活动时间,默认值为IoScheduler#保持活动时间#时间#默认值
    *rx2。io优先级(int):设置#io()调度程序的线程优先级,默认值为线程#NORM_优先级
    *rx2。计算线程(int):设置#computation()调度程序中的线程数,默认为可用CPU数
    *rx2。计算优先级(int):设置#computation()调度程序的线程优先级,默认为线程#NORM_优先级
    *rx2。newthread priority(int):设置#newthread()调度程序的线程优先级,默认为线程#NORM_priority
    *rx2。单优先级(int):设置#single()调度程序的线程优先级,默认为线程#NORM_优先级
    *rx2。purge enabled(布尔):启用定期清除所有计划程序的备份线程池,默认值为false
    *rx2。清除周期秒(int):指定所有计划程序的备份线程池的定期清除间隔,默认值为1秒

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Override
  public ObservableSource<Integer> apply(Integer i) throws Exception {
    return i == 3 ? Observable.just(i) : Observable
        .just(i)
        .delay(1, TimeUnit.MILLISECONDS, Schedulers.io());
  }
})

代码示例来源:origin: ReactiveX/RxJava

/**
 * Shuts down the standard Schedulers.
 * <p>The operation is idempotent and thread-safe.
 */
public static void shutdown() {
  computation().shutdown();
  io().shutdown();
  newThread().shutdown();
  single().shutdown();
  trampoline().shutdown();
  SchedulerPoolFactory.shutdown();
}

代码示例来源:origin: ReactiveX/RxJava

@Override
protected Scheduler getScheduler() {
  return Schedulers.from(executor);
}

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Observable<Integer> apply(Integer t1) {
    return composer(Observable.range(t1 * 10, 2), subscriptionCount, m)
        .subscribeOn(Schedulers.computation());
  }
}, new BiFunction<Integer, Integer, Integer>() {

代码示例来源:origin: ReactiveX/RxJava

@Override
protected Scheduler getScheduler() {
  return Schedulers.newThread();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void doubleObserveOn() {
  Flowable.just(1).hide()
  .observeOn(Schedulers.computation())
  .observeOn(Schedulers.single())
  .test()
  .awaitDone(5, TimeUnit.SECONDS)
  .assertResult(1);
}

代码示例来源:origin: ReactiveX/RxJava

public ObservableRefCount(ConnectableObservable<T> source) {
  this(source, 1, 0L, TimeUnit.NANOSECONDS, Schedulers.trampoline());
}

代码示例来源:origin: ReactiveX/RxJava

@Override
  public SingleSource<Integer> apply(Integer v) throws Exception {
    return Single.just(v).subscribeOn(Schedulers.computation());
  }
})

代码示例来源:origin: ReactiveX/RxJava

@Override
public Observable<String> apply(final GroupedObservable<Integer, Integer> group) {
  return group.subscribeOn(Schedulers.newThread()).map(new Function<Integer, String>() {
    @Override
    public String apply(Integer t1) {
      System.out.println("Received: " + t1 + " on group : " + group.getKey());
      return "first groups: " + t1;
    }
  });
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void doubleObserveOnError() {
  Flowable.error(new TestException())
  .observeOn(Schedulers.computation())
  .observeOn(Schedulers.single())
  .test()
  .awaitDone(5, TimeUnit.SECONDS)
  .assertFailure(TestException.class);
}

代码示例来源:origin: ReactiveX/RxJava

public FlowableRefCount(ConnectableFlowable<T> source) {
  this(source, 1, 0L, TimeUnit.NANOSECONDS, Schedulers.trampoline());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void periodicDirectTaskRaceIO() throws Exception {
  final Scheduler scheduler = Schedulers.io();
  for (int i = 0; i < 100; i++) {
    final Disposable d = scheduler.schedulePeriodicallyDirect(
        Functions.EMPTY_RUNNABLE, 0, 0, TimeUnit.MILLISECONDS);
    Thread.sleep(1);
    d.dispose();
  }
}

代码示例来源:origin: ReactiveX/RxJava

/**
 * Starts the standard Schedulers.
 * <p>The operation is idempotent and thread-safe.
 */
public static void start() {
  computation().start();
  io().start();
  newThread().start();
  single().start();
  trampoline().start();
  SchedulerPoolFactory.start();
}

代码示例来源:origin: ReactiveX/RxJava

@Override
  public SingleSource<Integer> apply(Integer v) throws Exception {
    return Single.just(v).subscribeOn(Schedulers.computation());
  }
})

代码示例来源:origin: ReactiveX/RxJava

private static Observable<String> infinite(final AtomicInteger produced) {
  return Observable.unsafeCreate(new ObservableSource<String>() {
    @Override
    public void subscribe(Observer<? super String> observer) {
      Disposable bs = Disposables.empty();
      observer.onSubscribe(bs);
      while (!bs.isDisposed()) {
        observer.onNext("onNext");
        produced.incrementAndGet();
      }
    }
  }).subscribeOn(Schedulers.newThread());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void doubleObserveOnErrorConditional() {
  Flowable.error(new TestException())
  .observeOn(Schedulers.computation())
  .distinct()
  .observeOn(Schedulers.single())
  .test()
  .awaitDone(5, TimeUnit.SECONDS)
  .assertFailure(TestException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Override
protected Scheduler getScheduler() {
  return Schedulers.from(executor, true);
}

代码示例来源:origin: ReactiveX/RxJava

@Override
protected Scheduler getScheduler() {
  return Schedulers.trampoline();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void overrideIoScheduler() {
  try {
    RxJavaPlugins.setIoSchedulerHandler(replaceWithImmediate);
    assertSame(ImmediateThinScheduler.INSTANCE, Schedulers.io());
  } finally {
    RxJavaPlugins.reset();
  }
  // make sure the reset worked
  assertNotSame(ImmediateThinScheduler.INSTANCE, Schedulers.io());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void moreThanMaxWorkers() {
  final List<Worker> list = new ArrayList<Worker>();
  SchedulerMultiWorkerSupport mws = (SchedulerMultiWorkerSupport)Schedulers.computation();
  mws.createWorkers(max * 2, new WorkerCallback() {
    @Override
    public void onWorker(int i, Worker w) {
      list.add(w);
    }
  });
  assertEquals(max * 2, list.size());
}

相关文章