io.reactivex.Scheduler类的使用及代码示例

x33g5p2x  于2022-01-30 转载在 其他  
字(13.1k)|赞(0)|评价(0)|浏览(96)

本文整理了Java中io.reactivex.Scheduler类的一些代码示例,展示了Scheduler类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Scheduler类的具体详情如下:
包路径:io.reactivex.Scheduler
类名称:Scheduler

Scheduler介绍

[英]A Scheduler is an object that specifies an API for scheduling units of work provided in the form of Runnables to be executed without delay (effectively as soon as possible), after a specified time delay or periodically and represents an abstraction over an asynchronous boundary that ensures these units of work get executed by some underlying task-execution scheme (such as custom Threads, event loop, java.util.concurrent.Executor or Actor system) with some uniform properties and guarantees regardless of the particular underlying scheme.

You can get various standard, RxJava-specific instances of this class via the static methods of the io.reactivex.schedulers.Schedulers utility class.

The so-called Workers of a Scheduler can be created via the #createWorker() method which allow the scheduling of multiple Runnable tasks in an isolated manner. Runnable tasks scheduled on a Worker are guaranteed to be executed sequentially and in a non-overlapping fashion. Non-delayed Runnable tasks are guaranteed to execute in a First-In-First-Out order but their execution may be interleaved with delayed tasks. In addition, outstanding or running tasks can be cancelled together via Worker#dispose() without affecting any other Worker instances of the same Scheduler.

Implementations of the #scheduleDirect and Worker#schedule methods are encouraged to call the io.reactivex.plugins.RxJavaPlugins#onSchedule(Runnable)method to allow a scheduler hook to manipulate (wrap or replace) the original Runnable task before it is submitted to the underlying task-execution scheme.

The default implementations of the scheduleDirect methods provided by this abstract class delegate to the respective schedule methods in the Worker instance created via #createWorker()for each individual Runnable task submitted. Implementors of this class are encouraged to provide a more efficient direct scheduling implementation to avoid the time and memory overhead of creating such Workers for every task. This delegation is done via special wrapper instances around the original Runnable before calling the respective Worker.schedule method. Note that this can lead to multiple RxJavaPlugins.onSchedule calls and potentially multiple hooks applied. Therefore, the default implementations of scheduleDirect (and the Worker#schedulePeriodically(Runnable,long,long,TimeUnit)) wrap the incoming Runnable into a class that implements the io.reactivex.schedulers.SchedulerRunnableIntrospectioninterface which can grant access to the original or hooked Runnable, thus, a repeated RxJavaPlugins.onSchedulecan detect the earlier hook and not apply a new one over again.

The default implementation of #now(TimeUnit) and Worker#now(TimeUnit) methods to return current System#currentTimeMillis() value in the desired time unit. Custom Scheduler implementations can override this to provide specialized time accounting (such as virtual time to be advanced programmatically). Note that operators requiring a Scheduler may rely on either of the now() calls provided by Scheduler or Worker respectively, therefore, it is recommended they represent a logically consistent source of the current time.

The default implementation of the Worker#schedulePeriodically(Runnable,long,long,TimeUnit) method uses the Worker#schedule(Runnable,long,TimeUnit) for scheduling the Runnable task periodically. The algorithm calculates the next absolute time when the task should run again and schedules this execution based on the relative time between it and Worker#now(TimeUnit). However, drifts or changes in the system clock could affect this calculation either by scheduling subsequent runs too frequently or too far apart. Therefore, the default implementation uses the #clockDriftTolerance() value (set via rx2.scheduler.drift-tolerance in minutes) to detect a drift in Worker#now(TimeUnit) and re-adjust the absolute/relative time calculation accordingly.

The default implementations of #start() and #shutdown() do nothing and should be overridden if the underlying task-execution scheme supports stopping and restarting itself.

If the Scheduler is shut down or a Worker is disposed, the schedule methods should return the io.reactivex.disposables.Disposables#disposed() singleton instance indicating the shut down/disposed state to the caller. Since the shutdown or dispose can happen from any thread, the schedule implementations should make best effort to cancel tasks immediately after those tasks have been submitted to the underlying task-execution scheme if the shutdown/dispose was detected after this submission.

All methods on the Scheduler and Worker classes should be thread safe.
[中]调度器是一个对象,它指定了一个API,用于调度以可运行文件的形式提供的工作单元,以便在没有延迟的情况下(有效地尽快)执行,在指定的时间延迟或周期性延迟之后,表示异步边界上的抽象,该抽象确保这些工作单元由某个底层任务执行方案(如自定义线程、事件循环、java.util.concurrent.Executor或Actor系统)执行,该方案具有一些统一的属性和保证,而不考虑特定的基本方案。
您可以通过io的静态方法获得这个类的各种标准的、特定于RxJava的实例。reactivex。调度员。调度器实用程序类。
可以通过#createWorker()方法创建调度器的所谓工作进程,该方法允许以隔离的方式调度多个可运行任务。在辅助进程上调度的可运行任务保证按顺序以不重叠的方式执行。非延迟可运行任务保证以先进先出的顺序执行,但它们的执行可能与延迟任务交错。此外,可以通过Worker#dispose()一起取消未完成或正在运行的任务,而不会影响同一计划程序的任何其他Worker实例。
鼓励#scheduleDirect和Worker#schedule方法的实现调用io。reactivex。插件。RxJavaPlugins#onSchedule(Runnable)方法,允许调度器钩子在将原始可运行任务提交到基础任务执行方案之前操作(包装或替换)原始可运行任务。
此抽象类提供的scheduleDirect方法的默认实现将委托给通过#createWorker()为每个提交的可运行任务创建的Worker实例中的相应调度方法。鼓励此类实现人员提供更高效的直接调度实现,以避免为每个任务创建此类工作人员所需的时间和内存开销。在调用相应的Worker之前,该委托是通过围绕原始Runnable的特殊包装器实例完成的。计划方法。请注意,这可能会导致多个RxJava插件。onSchedule调用和可能应用的多个钩子。因此,scheduleDirect的默认实现(以及Worker#SchedulePeriodic(Runnable,long,long,TimeUnit))将传入的Runnable封装到一个实现io的类中。reactivex。调度员。SchedulerRunnableIntrospectioninterface,它可以授予对原始或钩住的Runnable的访问权限,从而获得重复的RxJavaPlugins。OnSchedule可以检测早期的钩子,而不会再次应用新的钩子。
#now(TimeUnit)和Worker#now(TimeUnit)方法的默认实现,用于以所需时间单位返回当前系统#currentTimeMillis()值。定制的调度器实现可以覆盖这一点,以提供专门的时间核算(例如以编程方式提前的虚拟时间)。请注意,需要调度器的操作员可能会分别依赖调度器或Worker提供的now()调用,因此,建议它们表示逻辑上一致的当前时间源。
Worker#SchedulePeriodic(Runnable,long,long,TimeUnit)方法的默认实现使用Worker#schedule(Runnable,long,TimeUnit)定期调度可运行任务。该算法计算下一次任务应该再次运行的绝对时间,并根据任务与Worker#now(时间单位)之间的相对时间安排执行。然而,系统时钟的漂移或变化可能会通过安排后续运行过于频繁或间隔过远而影响此计算。因此,默认实现使用#clockDriftTolerance()值(通过rx2.scheduler.drift-tolerance设置,以分钟为单位)检测Worker#now(时间单位)中的漂移,并相应地重新调整绝对/相对时间计算。
#start()和#shutdown()的默认实现不起任何作用,如果底层任务执行方案支持停止和重新启动自身,则应该覆盖它们。
如果调度程序关闭或工作程序被释放,调度方法应返回io。reactivex。一次性的。Disposables#disposed()单个实例,向调用者指示关闭/释放状态。由于关机或dispose可以从任何线程发生,因此如果在提交后检测到关机/dispose,则在将这些任务提交到基础任务执行方案后,计划实现应尽最大努力立即取消这些任务。
调度器类和工作类上的所有方法都应该是线程安全的。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Override
public void subscribeActual(final Subscriber<? super T> s) {
  Scheduler.Worker w = scheduler.createWorker();
  final SubscribeOnSubscriber<T> sos = new SubscribeOnSubscriber<T>(s, w, source, nonScheduledRequests);
  s.onSubscribe(sos);
  w.schedule(sos);
}

代码示例来源:origin: ReactiveX/RxJava

@Override
public void onSuccess(T value) {
  this.value = value;
  Disposable d = scheduler.scheduleDirect(this);
  DisposableHelper.replace(this, d);
}

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Timed<T> apply(T t) throws Exception {
    return new Timed<T>(t, scheduler.now(unit), unit);
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Override
public void subscribeActual(Subscriber<? super Long> s) {
  IntervalSubscriber is = new IntervalSubscriber(s);
  s.onSubscribe(is);
  Scheduler sch = scheduler;
  if (sch instanceof TrampolineScheduler) {
    Worker worker = sch.createWorker();
    is.setResource(worker);
    worker.schedulePeriodically(is, initialDelay, period, unit);
  } else {
    Disposable d = sch.schedulePeriodicallyDirect(is, initialDelay, period, unit);
    is.setResource(d);
  }
}

代码示例来源:origin: ReactiveX/RxJava

s.shutdown();
s.shutdown();
assertEquals(Disposables.disposed(), s.scheduleDirect(r));
assertEquals(Disposables.disposed(), s.scheduleDirect(r, 1, TimeUnit.SECONDS));
assertEquals(Disposables.disposed(), s.schedulePeriodicallyDirect(r, 1, 1, TimeUnit.SECONDS));
Worker w = s.createWorker();
w.dispose();

代码示例来源:origin: ReactiveX/RxJava

@Test
public void reuseScheduledExecutor() throws Exception {
  ScheduledExecutorService exec = Executors.newSingleThreadScheduledExecutor();
  try {
    Scheduler s = Schedulers.from(exec);
    final CountDownLatch cdl = new CountDownLatch(8);
    Runnable r = new Runnable() {
      @Override
      public void run() {
        cdl.countDown();
      }
    };
    s.scheduleDirect(r);
    s.scheduleDirect(r, 10, TimeUnit.MILLISECONDS);
    Disposable d = s.schedulePeriodicallyDirect(r, 10, 10, TimeUnit.MILLISECONDS);
    try {
      assertTrue(cdl.await(5, TimeUnit.SECONDS));
    } finally {
      d.dispose();
    }
  } finally {
    exec.shutdown();
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Override
public void onSubscribe(Subscription s) {
  if (SubscriptionHelper.validate(this.upstream, s)) {
    this.upstream = s;
    downstream.onSubscribe(this);
    timer.replace(scheduler.schedulePeriodicallyDirect(this, period, period, unit));
    s.request(Long.MAX_VALUE);
  }
}

代码示例来源:origin: ReactiveX/RxJava

/**
 * Shuts down the standard Schedulers.
 * <p>The operation is idempotent and thread-safe.
 */
public static void shutdown() {
  computation().shutdown();
  io().shutdown();
  newThread().shutdown();
  single().shutdown();
  trampoline().shutdown();
  SchedulerPoolFactory.shutdown();
}

代码示例来源:origin: ReactiveX/RxJava

/**
 * Starts the standard Schedulers.
 * <p>The operation is idempotent and thread-safe.
 */
public static void start() {
  computation().start();
  io().start();
  newThread().start();
  single().start();
  trampoline().start();
  SchedulerPoolFactory.start();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void shutdownRejects() {
  final int[] calls = { 0 };
  Runnable r = new Runnable() {
    @Override
    public void run() {
      calls[0]++;
    }
  };
  Scheduler s = new SingleScheduler();
  s.shutdown();
  assertEquals(Disposables.disposed(), s.scheduleDirect(r));
  assertEquals(Disposables.disposed(), s.scheduleDirect(r, 1, TimeUnit.SECONDS));
  assertEquals(Disposables.disposed(), s.schedulePeriodicallyDirect(r, 1, 1, TimeUnit.SECONDS));
  Worker w = s.createWorker();
  ((ScheduledWorker)w).executor.shutdownNow();
  assertEquals(Disposables.disposed(), w.schedule(r));
  assertEquals(Disposables.disposed(), w.schedule(r, 1, TimeUnit.SECONDS));
  assertEquals(Disposables.disposed(), w.schedulePeriodically(r, 1, 1, TimeUnit.SECONDS));
  assertEquals(0, calls[0]);
  w.dispose();
  assertTrue(w.isDisposed());
}

代码示例来源:origin: ReactiveX/RxJava

@Override
public void subscribeActual(Subscriber<? super Long> s) {
  IntervalRangeSubscriber is = new IntervalRangeSubscriber(s, start, end);
  s.onSubscribe(is);
  Scheduler sch = scheduler;
  if (sch instanceof TrampolineScheduler) {
    Worker worker = sch.createWorker();
    is.setResource(worker);
    worker.schedulePeriodically(is, initialDelay, period, unit);
  } else {
    Disposable d = sch.schedulePeriodicallyDirect(is, initialDelay, period, unit);
    is.setResource(d);
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void rejectingExecutor() {
  ScheduledExecutorService exec = Executors.newSingleThreadScheduledExecutor();
  exec.shutdown();
  Scheduler s = Schedulers.from(exec);
  List<Throwable> errors = TestHelper.trackPluginErrors();
  try {
    assertSame(EmptyDisposable.INSTANCE, s.scheduleDirect(Functions.EMPTY_RUNNABLE));
    assertSame(EmptyDisposable.INSTANCE, s.scheduleDirect(Functions.EMPTY_RUNNABLE, 10, TimeUnit.MILLISECONDS));
    assertSame(EmptyDisposable.INSTANCE, s.schedulePeriodicallyDirect(Functions.EMPTY_RUNNABLE, 10, 10, TimeUnit.MILLISECONDS));
    TestHelper.assertUndeliverable(errors, 0, RejectedExecutionException.class);
    TestHelper.assertUndeliverable(errors, 1, RejectedExecutionException.class);
    TestHelper.assertUndeliverable(errors, 2, RejectedExecutionException.class);
  } finally {
    RxJavaPlugins.reset();
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Override
  public void run() {
    disposable.set(scheduler.schedulePeriodicallyDirect(Functions.EMPTY_RUNNABLE, 1, 10000, TimeUnit.MILLISECONDS));
  }
});

代码示例来源:origin: redisson/redisson

/**
 * Shuts down the standard Schedulers.
 * <p>The operation is idempotent and thread-safe.
 */
public static void shutdown() {
  computation().shutdown();
  io().shutdown();
  newThread().shutdown();
  single().shutdown();
  trampoline().shutdown();
  SchedulerPoolFactory.shutdown();
}

代码示例来源:origin: ReactiveX/RxJava

@Override
  public void run() {
    s.start();
  }
};

代码示例来源:origin: ReactiveX/RxJava

@Override
protected void subscribeActual(Subscriber<? super T> s) {
  source.subscribe(new ThrottleLatestSubscriber<T>(s, timeout, unit, scheduler.createWorker(), emitLast));
}

代码示例来源:origin: ReactiveX/RxJava

@Override
public void onSuccess(T value) {
  this.value = value;
  DisposableHelper.replace(this, scheduler.scheduleDirect(this));
}

代码示例来源:origin: ReactiveX/RxJava

@Override
public void onSubscribe(Subscription s) {
  if (SubscriptionHelper.validate(this.upstream, s)) {
    lastTime = scheduler.now(unit);
    this.upstream = s;
    downstream.onSubscribe(this);
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Override
public void subscribeActual(Observer<? super Long> observer) {
  IntervalRangeObserver is = new IntervalRangeObserver(observer, start, end);
  observer.onSubscribe(is);
  Scheduler sch = scheduler;
  if (sch instanceof TrampolineScheduler) {
    Worker worker = sch.createWorker();
    is.setResource(worker);
    worker.schedulePeriodically(is, initialDelay, period, unit);
  } else {
    Disposable d = sch.schedulePeriodicallyDirect(is, initialDelay, period, unit);
    is.setResource(d);
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void rejectingExecutor() {
  ScheduledExecutorService exec = Executors.newSingleThreadScheduledExecutor();
  exec.shutdown();
  Scheduler s = Schedulers.from(exec, true);
  List<Throwable> errors = TestHelper.trackPluginErrors();
  try {
    assertSame(EmptyDisposable.INSTANCE, s.scheduleDirect(Functions.EMPTY_RUNNABLE));
    assertSame(EmptyDisposable.INSTANCE, s.scheduleDirect(Functions.EMPTY_RUNNABLE, 10, TimeUnit.MILLISECONDS));
    assertSame(EmptyDisposable.INSTANCE, s.schedulePeriodicallyDirect(Functions.EMPTY_RUNNABLE, 10, 10, TimeUnit.MILLISECONDS));
    TestHelper.assertUndeliverable(errors, 0, RejectedExecutionException.class);
    TestHelper.assertUndeliverable(errors, 1, RejectedExecutionException.class);
    TestHelper.assertUndeliverable(errors, 2, RejectedExecutionException.class);
  } finally {
    RxJavaPlugins.reset();
  }
}

相关文章