io.reactivex.Scheduler.scheduleDirect()方法的使用及代码示例

x33g5p2x  于2022-01-30 转载在 其他  
字(5.2k)|赞(0)|评价(0)|浏览(171)

本文整理了Java中io.reactivex.Scheduler.scheduleDirect()方法的一些代码示例,展示了Scheduler.scheduleDirect()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Scheduler.scheduleDirect()方法的具体详情如下:
包路径:io.reactivex.Scheduler
类名称:Scheduler
方法名:scheduleDirect

Scheduler.scheduleDirect介绍

[英]Schedules the given task on this Scheduler without any time delay.

This method is safe to be called from multiple threads but there are no ordering or non-overlapping guarantees between tasks.
[中]在此计划程序上计划给定的任务,没有任何时间延迟。
从多个线程调用此方法是安全的,但任务之间没有顺序或不重叠的保证。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Override
public void dispose() {
  Disposable d = getAndSet(DisposableHelper.DISPOSED);
  if (d != DisposableHelper.DISPOSED) {
    this.ds = d;
    scheduler.scheduleDirect(this);
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Override
public void onError(Throwable e) {
  this.error = e;
  Disposable d = scheduler.scheduleDirect(this);
  DisposableHelper.replace(this, d);
}

代码示例来源:origin: ReactiveX/RxJava

@Override
public void onSuccess(T value) {
  this.value = value;
  DisposableHelper.replace(this, scheduler.scheduleDirect(this));
}

代码示例来源:origin: ReactiveX/RxJava

@Override
public void onError(Throwable e) {
  this.error = e;
  DisposableHelper.replace(this, scheduler.scheduleDirect(this));
}

代码示例来源:origin: ReactiveX/RxJava

@Override
public void dispose() {
  if (compareAndSet(false, true)) {
    scheduler.scheduleDirect(new DisposeTask());
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Override
public void onSubscribe(final Disposable d) {
  Schedulers.single().scheduleDirect(new Runnable() {
    @Override
    public void run() {
      d.dispose();
    }
  }, 550, TimeUnit.MILLISECONDS);
}

代码示例来源:origin: ReactiveX/RxJava

@Override
protected void subscribeActual(final CompletableObserver observer) {
  TimerDisposable parent = new TimerDisposable(observer);
  observer.onSubscribe(parent);
  parent.setFuture(scheduler.scheduleDirect(parent, delay, unit));
}

代码示例来源:origin: ReactiveX/RxJava

@Override
  public void run() {
    getScheduler().scheduleDirect(Functions.EMPTY_RUNNABLE);
  }
});

代码示例来源:origin: ReactiveX/RxJava

@Override
protected void subscribeActual(final SingleObserver<? super Long> observer) {
  TimerDisposable parent = new TimerDisposable(observer);
  observer.onSubscribe(parent);
  parent.setFuture(scheduler.scheduleDirect(parent, delay, unit));
}

代码示例来源:origin: ReactiveX/RxJava

@Override
protected void subscribeActual(final SingleObserver<? super T> observer) {
  final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer, source);
  observer.onSubscribe(parent);
  Disposable f = scheduler.scheduleDirect(parent);
  parent.task.replace(f);
}

代码示例来源:origin: ReactiveX/RxJava

@Override
public void subscribeActual(Observer<? super Long> observer) {
  TimerObserver ios = new TimerObserver(observer);
  observer.onSubscribe(ios);
  Disposable d = scheduler.scheduleDirect(ios, delay, unit);
  ios.setResource(d);
}

代码示例来源:origin: ReactiveX/RxJava

@Override
protected void subscribeActual(MaybeObserver<? super T> observer) {
  SubscribeOnMaybeObserver<T> parent = new SubscribeOnMaybeObserver<T>(observer);
  observer.onSubscribe(parent);
  parent.task.replace(scheduler.scheduleDirect(new SubscribeTask<T>(parent, source)));
}

代码示例来源:origin: ReactiveX/RxJava

@Override
protected void subscribeActual(final SingleObserver<? super T> observer) {
  TimeoutMainObserver<T> parent = new TimeoutMainObserver<T>(observer, other, timeout, unit);
  observer.onSubscribe(parent);
  DisposableHelper.replace(parent.task, scheduler.scheduleDirect(parent, timeout, unit));
  source.subscribe(parent);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void scheduleDirectDelayed() throws Exception {
  Scheduler s = getScheduler();
  final CountDownLatch cdl = new CountDownLatch(1);
  s.scheduleDirect(new Runnable() {
    @Override
    public void run() {
      cdl.countDown();
    }
  }, 50, TimeUnit.MILLISECONDS);
  assertTrue(cdl.await(5, TimeUnit.SECONDS));
}

代码示例来源:origin: ReactiveX/RxJava

@Test(timeout = 1000)
public void runnableDisposedAsyncTimed() throws Exception {
  final Scheduler s = Schedulers.single();
  Disposable d = s.scheduleDirect(Functions.EMPTY_RUNNABLE, 1, TimeUnit.MILLISECONDS);
  while (!d.isDisposed()) {
    Thread.sleep(1);
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void completeAsync() throws Exception {
  Schedulers.single().scheduleDirect(new Runnable() {
    @Override
    public void run() {
      fo.onNext(1);
      fo.onComplete();
    }
  }, 500, TimeUnit.MILLISECONDS);
  assertEquals(1, fo.get().intValue());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void await() throws Exception {
  Schedulers.single().scheduleDirect(new Runnable() {
    @Override
    public void run() {
      fs.onNext(1);
      fs.onComplete();
    }
  }, 100, TimeUnit.MILLISECONDS);
  assertEquals(1, fs.get(5, TimeUnit.SECONDS).intValue());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void scheduleDirectWithDelayNullRunnable() {
  try {
    getScheduler().scheduleDirect(null, 10, TimeUnit.MILLISECONDS);
    fail();
  } catch (NullPointerException npe) {
    assertEquals("run is null", npe.getMessage());
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void completeAsync() throws Exception {
  Schedulers.single().scheduleDirect(new Runnable() {
    @Override
    public void run() {
      fs.onNext(1);
      fs.onComplete();
    }
  }, 500, TimeUnit.MILLISECONDS);
  assertEquals(1, fs.get().intValue());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void blockingGetDefault() {
  final BlockingMultiObserver<Integer> bmo = new BlockingMultiObserver<Integer>();
  Schedulers.single().scheduleDirect(new Runnable() {
    @Override
    public void run() {
      bmo.onSuccess(1);
    }
  }, 100, TimeUnit.MILLISECONDS);
  assertEquals(1, bmo.blockingGet(0).intValue());
}

相关文章