rx.subjects.Subject类的使用及代码示例

x33g5p2x  于2022-01-30 转载在 其他  
字(7.0k)|赞(0)|评价(0)|浏览(109)

本文整理了Java中rx.subjects.Subject类的一些代码示例,展示了Subject类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Subject类的具体详情如下:
包路径:rx.subjects.Subject
类名称:Subject

Subject介绍

[英]Represents an object that is both an Observable and an Observer.
[中]表示既是可观察对象又是观察者的对象。

代码示例

代码示例来源:origin: PipelineAI/pipeline

public void write(HystrixCommandExecutionStarted event) {
  writeOnlySubject.onNext(event);
}

代码示例来源:origin: PipelineAI/pipeline

public void shutdown() {
  writeOnlyCommandStartSubject.onCompleted();
  writeOnlyCommandCompletionSubject.onCompleted();
  writeOnlyCollapserSubject.onCompleted();
}

代码示例来源:origin: HotBitmapGG/bilibili-android-client

/**
   * 接收消息
   */
  public <T> Observable<T> toObserverable(Class<T> eventType) {
    return bus.ofType(eventType);
  }
}

代码示例来源:origin: akarnokd/akarnokd-misc

@Test
public void test2() throws Exception {
  Subject<Long, Long> subject = PublishSubject.create();
  Observable<Long> initialObservable = subject.share()
  .map(value -> {
    System.out.println("Received value " + value);
    new Exception().printStackTrace(System.out);
    return value;
  });
  Observable<Long> timeoutObservable = initialObservable.map(value -> {
    System.out.println("Timeout received value " + value);
    return value;
  });
  TestSubscriber<Long> subscriber = new TestSubscriber<>();
  initialObservable
  .doOnUnsubscribe(() -> System.out.println("Unsubscribed"))
  .timeout(1, TimeUnit.SECONDS, timeoutObservable).subscribe(subscriber);
  subject.onNext(5L);
  Thread.sleep(1500);
  subject.onNext(10L);
  subject.onCompleted();
  subscriber.awaitTerminalEvent();
  subscriber.assertNoErrors();
  subscriber.assertValues(5L, 10L);
}

代码示例来源:origin: com.couchbase.client/core-io

@Override
public void onNext(T t) {
  state.bufferedSubject.onNext(t);
  // Schedule timeout once and when not subscribed yet.
  if (state.casTimeoutScheduled() && state.state == State.STATES.UNSUBSCRIBED.ordinal()) {
    state.setTimeoutSubscription(timeoutScheduler.subscribe(new Action1<Long>() { // Schedule timeout after the first content arrives.
      @Override
      public void call(Long aLong) {
        disposeIfNotSubscribed();
      }
    }));
  }
}

代码示例来源:origin: laotan7237/EasyReader

/**
 * 根据传递的code和 eventType 类型返回特定类型(eventType)的 被观察者
 * 对于注册了code为0,class为voidMessage的观察者,那么就接收不到code为0之外的voidMessage。
 * @param code 事件code
 * @param eventType 事件类型
 * @param <T>
 * @return
 */
public <T> Observable<T> toObservable(final int code, final Class<T> eventType) {
  return _bus.ofType(RxBusBaseMessage.class)
      .filter(new Func1<RxBusBaseMessage,Boolean>() {
        @Override
        public Boolean call(RxBusBaseMessage o) {
          //过滤code和eventType都相同的事件
          return o.getCode() == code && eventType.isInstance(o.getObject());
        }
      }).map(new Func1<RxBusBaseMessage,Object>() {
        @Override
        public Object call(RxBusBaseMessage o) {
          return o.getObject();
        }
      }).cast(eventType);
}
/**

代码示例来源:origin: com.netflix.eureka/eureka2-client

@Override
public Observable<Boolean> register(InstanceInfo instanceInfo, Source source) {
  relay.onNext(new ChangeNotification<>(ChangeNotification.Kind.Add ,instanceInfo));
  return Observable.just(true);
}

代码示例来源:origin: akarnokd/akarnokd-misc

public static <T, R> Observable.Transformer<T, R> switchFlatMap(
      int n, Func1<T, Observable<R>> mapper) {
    return f ->
      Observable.defer(() -> {
        final AtomicInteger ingress = new AtomicInteger();
        final Subject<Integer, Integer> cancel =
            PublishSubject.<Integer>create().toSerialized();
        return f.flatMap(v -> {
          int id = ingress.getAndIncrement();
          Observable<R> o = mapper.call(v)
              .takeUntil(cancel.filter(e -> e == id + n));
          cancel.onNext(id);
          return o;
        });
      })
    ;
  }
}

代码示例来源:origin: com.netflix.eureka/eureka2-client

@Override
public Observable<Void> shutdown() {
  remoteBatchingRegistry.shutdown();
  relay.onCompleted();
  return Observable.empty();
}

代码示例来源:origin: com.netflix.eureka/eureka2-client

@Override
  public Observable<Void> shutdown(Throwable cause) {
    remoteBatchingRegistry.shutdown();
    relay.onError(cause);
    return Observable.empty();
  }
}

代码示例来源:origin: akarnokd/RxJava2Interop

@Test
public void sj2ToSj1Normal() {
  io.reactivex.subjects.PublishSubject<Integer> ps2 = io.reactivex.subjects.PublishSubject.create();
  rx.subjects.Subject<Integer, Integer> sj1 = toV1Subject(ps2);
  rx.observers.AssertableSubscriber<Integer> to = sj1.test();
  assertTrue(sj1.hasObservers());
  assertTrue(ps2.hasObservers());
  sj1.onNext(1);
  sj1.onNext(2);
  sj1.onCompleted();
  assertFalse(sj1.hasObservers());
  assertFalse(ps2.hasObservers());
  to.assertResult(1, 2);
}

代码示例来源:origin: com.couchbase.client/core-io

@Override
  public void call() {
    try {
      obs.onNext(response);
      obs.onCompleted();
    } catch(Exception ex) {
      obs.onError(ex);
    } finally {
      worker.unsubscribe();
    }
  }
});

代码示例来源:origin: akarnokd/RxJava2Interop

@Test
public void sj2ToSj1Lifecycle() {
  io.reactivex.subjects.PublishSubject<Integer> pp2 = io.reactivex.subjects.PublishSubject.create();
  rx.subjects.Subject<Integer, Integer> sj1 = toV1Subject(pp2);
  rx.observers.AssertableSubscriber<Integer> to = sj1.test(0L);
  assertTrue(sj1.hasObservers());
  assertTrue(pp2.hasObservers());
  sj1.onNext(1);
  sj1.onError(new IOException());
  assertFalse(sj1.hasObservers());
  assertFalse(pp2.hasObservers());
  assertFalse(pp2.hasComplete());
  assertTrue(pp2.hasThrowable());
  assertNotNull(pp2.getThrowable());
  to.assertFailure(rx.exceptions.MissingBackpressureException.class);
}

代码示例来源:origin: bravekingzhang/CleanArch

/**
 * 消息发送者调用
 * @param o
 */
public void send(Object o) {
  if (_bus.hasObservers()){
    _bus.onNext(o);
  }
}

代码示例来源:origin: akarnokd/RxJava2Interop

@Test
public void sj2ToSj1Backpressured() {
  io.reactivex.subjects.PublishSubject<Integer> pp2 = io.reactivex.subjects.PublishSubject.create();
  rx.subjects.Subject<Integer, Integer> sj1 = toV1Subject(pp2);
  rx.observers.AssertableSubscriber<Integer> to = sj1.test(0L);
  assertTrue(sj1.hasObservers());
  assertTrue(pp2.hasObservers());
  sj1.onNext(1);
  assertFalse(sj1.hasObservers());
  assertFalse(pp2.hasObservers());
  assertFalse(pp2.hasComplete());
  assertFalse(pp2.hasThrowable());
  assertNull(pp2.getThrowable());
  to.assertFailure(rx.exceptions.MissingBackpressureException.class);
}

代码示例来源:origin: com.couchbase.client/core-io

public void add(final StatResponse response) {
  if (response.key() == null) {
    // Skip NULL-terminator for successful response
    if (!response.status().isSuccess()) {
      observable().onNext(response);
    }
    observable().onCompleted();
  } else {
    observable().onNext(response);
  }
}

代码示例来源:origin: yahoo/fili

/**
 * Handle publishing the length and an error to the Subject.
 *
 * @param stream  Stream to get the length from
 * @param t  Error that was encountered (to be published)
 */
private void emitError(LengthOfOutputStream stream, Throwable t) {
  Subject<Long, Long> lengthBroadcaster = stream.getLengthBroadcaster();
  lengthBroadcaster.onNext(stream.getResponseLength());
  lengthBroadcaster.onError(t);
}

代码示例来源:origin: akarnokd/RxJava2Interop

@Override
public void onError(Throwable e) {
  if (!terminated) {
    if (e == null) {
      e = new NullPointerException("Throwable was null");
    }
    error = e;
    terminated = true;
    source.onError(e);
  } else {
    io.reactivex.plugins.RxJavaPlugins.onError(e);
  }
}

代码示例来源:origin: com.netflix.eureka/eureka2-core

private void terminateLifecycle(Throwable e) {
  if (e == null) {
    lifecycleSubject.onCompleted();
  } else {
    lifecycleSubject.onError(e);
  }
}

代码示例来源:origin: cn-ljb/rxjava_for_android

/**是否含有观察者*/
  public boolean hasObservers() {
    return _bus.hasObservers();
  }
}

相关文章