rx.subjects.Subject.subscribe()方法的使用及代码示例

x33g5p2x  于2022-01-30 转载在 其他  
字(5.3k)|赞(0)|评价(0)|浏览(110)

本文整理了Java中rx.subjects.Subject.subscribe()方法的一些代码示例,展示了Subject.subscribe()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Subject.subscribe()方法的具体详情如下:
包路径:rx.subjects.Subject
类名称:Subject
方法名:subscribe

Subject.subscribe介绍

暂无

代码示例

代码示例来源:origin: com.netflix.eureka/eureka2-core

@Override
  public void call(Subscriber<? super Void> subscriber) {
    ackSubject.subscribe(subscriber);
  }
};

代码示例来源:origin: com.netflix.eureka/eureka2-core

@Override
  public void call(Subscriber<? super T> subscriber) {
    notificationSubject.subscribe(subscriber);
  }
}, notificationSubject);

代码示例来源:origin: akarnokd/akarnokd-misc

public Subscription subscribe(Observer<TestTransaction> transactionObserver) {
 return transactionSubject.subscribe(transactionObserver);
}

代码示例来源:origin: bravekingzhang/CleanArch

/**
 * 订阅
 * @param subscription  new CompositeSubscription()
 * @param eventLisener
 */
public void subscribe(@NonNull CompositeSubscription subscription,@NonNull final EventLisener eventLisener){
  subscription.add(_bus.subscribe(new Action1<Object>() {
    @Override
    public void call(Object event) {
      eventLisener.dealRxEvent(event);
    }
  }));
}

代码示例来源:origin: com.netflix.eureka/eureka2-core

@Override
  public void call(Subscriber<? super ChangeNotification<T>> subscriber) {
    // We need to buffer all the changes while the init state is replayed.
    // Because new instance holder updates will be added while we replay them, they will be
    // partially visible by the subscriber. When we replay buffered real time updates,
    // they may overlap with what was already sent from the init holder.
    // TODO can we make this buffering cheaper?
    final PauseableSubject<ChangeNotification<T>> realTimeSubject = PauseableSubject.create();
    realTimeSubject.pause();
    realTimeSource.subscribe(realTimeSubject);
    realTimeSubject.mergeWith(Observable.from(initStateHolder).doOnCompleted(new Action0() {
      @Override
      public void call() {
        realTimeSubject.resume();
      }
    })).subscribe(subscriber);
  }
});

代码示例来源:origin: couchbase/java-dcp-client

public HttpStreamingConfigProvider(final ClientEnvironment env) {
  super(LifecycleState.DISCONNECTED);
  this.env = env;
  this.remoteHosts = new AtomicReference<>(env.clusterAt());
  this.configStream = BehaviorSubject.<CouchbaseBucketConfig>create().toSerialized();
  configStream.subscribe(new Subscriber<CouchbaseBucketConfig>() {
    @Override
    public void onCompleted() {
      LOGGER.debug("Config stream completed.");
    }
    @Override
    public void onError(Throwable e) {
      LOGGER.warn("Error on config stream!", e);
    }
    @Override
    public void onNext(CouchbaseBucketConfig config) {
      List<InetSocketAddress> newNodes = new ArrayList<>();
      for (NodeInfo node : config.nodes()) {
        Integer port = (env.sslEnabled() ? node.sslServices() : node.services()).get(ServiceType.CONFIG);
        newNodes.add(new InetSocketAddress(node.rawHostname(), port));
      }
      LOGGER.trace("Updated config stream node list to {}.", newNodes);
      remoteHosts.set(newNodes);
    }
  });
}

代码示例来源:origin: yahoo/fili

lengthBroadcaster.subscribe(responseLengthObserver);
OutputStream stream = new LengthOfOutputStream(response.getEntityStream(), lengthBroadcaster);

代码示例来源:origin: com.couchbase.client/core-io

@Override
  public void call(final Subscriber<? super T> subscriber) {
    if (state.casState(State.STATES.UNSUBSCRIBED, State.STATES.SUBSCRIBED)) {
      subscriber.add(Subscriptions.create(new Action0() {
        @Override
        public void call() {
          if (null != state.onUnsubscribe) {
            state.onUnsubscribe.call();
          }
        }
      }));
      state.bufferedSubject.subscribe(subscriber);
      state.unsubscribeTimeoutSubscription();
    } else if(State.STATES.SUBSCRIBED.ordinal() == state.state) {
      String thisObservable = "This Observable ";
      if (state.traceId != null) {
        thisObservable = "This Observable (" + state.traceId + ") ";
      }
      subscriber.onError(new IllegalStateException(thisObservable + "can only have one subscription. "
        + "Use Observable.publish() if you want to multicast."));
    } else if(State.STATES.DISPOSED.ordinal() == state.state) {
      String thisObservable = "The content of this Observable ";
      if (state.traceId != null) {
        thisObservable = "The content of this Observable (" + state.traceId + ") ";
      }
      subscriber.onError(new IllegalStateException(thisObservable + "is already released. "
        + "Subscribe earlier or tune the CouchbaseEnvironment#autoreleaseAfter() setting."));
    }
  }
}

代码示例来源:origin: couchbase/couchbase-jvm-core

@Override
  public void call(final Subscriber<? super T> subscriber) {
    if (state.casState(State.STATES.UNSUBSCRIBED, State.STATES.SUBSCRIBED)) {
      subscriber.add(Subscriptions.create(new Action0() {
        @Override
        public void call() {
          if (null != state.onUnsubscribe) {
            state.onUnsubscribe.call();
          }
        }
      }));
      state.bufferedSubject.subscribe(subscriber);
      state.unsubscribeTimeoutSubscription();
    } else if(State.STATES.SUBSCRIBED.ordinal() == state.state) {
      String thisObservable = "This Observable ";
      if (state.traceId != null) {
        thisObservable = "This Observable (" + state.traceId + ") ";
      }
      subscriber.onError(new IllegalStateException(thisObservable + "can only have one subscription. "
        + "Use Observable.publish() if you want to multicast."));
    } else if(State.STATES.DISPOSED.ordinal() == state.state) {
      String thisObservable = "The content of this Observable ";
      if (state.traceId != null) {
        thisObservable = "The content of this Observable (" + state.traceId + ") ";
      }
      subscriber.onError(new IllegalStateException(thisObservable + "is already released. "
        + "Subscribe earlier or tune the CouchbaseEnvironment#autoreleaseAfter() setting."));
    }
  }
}

相关文章