rx.Single.toObservable()方法的使用及代码示例

x33g5p2x  于2022-01-29 转载在 其他  
字(6.8k)|赞(0)|评价(0)|浏览(149)

本文整理了Java中rx.Single.toObservable()方法的一些代码示例,展示了Single.toObservable()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Single.toObservable()方法的具体详情如下:
包路径:rx.Single
类名称:Single
方法名:toObservable

Single.toObservable介绍

暂无

代码示例

代码示例来源:origin: lettuce-io/lettuce-core

@Override
  public Observable<?> apply(Single<?> source) {
    return source.toObservable();
  }
}

代码示例来源:origin: PipelineAI/pipeline

private Observable toObservable(Object obj) {
    if (Observable.class.isAssignableFrom(obj.getClass())) {
      return (Observable) obj;
    } else if (Completable.class.isAssignableFrom(obj.getClass())) {
      return ((Completable) obj).toObservable();
    } else if (Single.class.isAssignableFrom(obj.getClass())) {
      return ((Single) obj).toObservable();
    } else {
      throw new IllegalStateException("unsupported rx type: " + obj.getClass());
    }
  }
}

代码示例来源:origin: ReactiveX/RxNetty

@Override
  public Observable<List<HostHolder<W, R>>> call(HostUpdate<W, R> holder) {
    return f.call(holder).toObservable();
  }
};

代码示例来源:origin: vert-x3/vertx-examples

private void insertAndFind() {
  // Documents to insert
  Observable<JsonObject> documents = Observable.just(
   new JsonObject().put("username", "temporalfox").put("firstname", "Julien").put("password", "bilto"),
   new JsonObject().put("username", "purplefox").put("firstname", "Tim").put("password", "wibble")
  );

  mongo.rxCreateCollection("users").flatMapObservable(v -> {
   // After collection is created we insert each document
   return documents.flatMap(doc -> mongo.rxInsert("users", doc).toObservable());
  }).doOnNext(id -> {
   System.out.println("Inserted document " + id);
  }).last().toSingle().flatMap(id -> {
   // Everything has been inserted now we can query mongo
   System.out.println("Insertions done");
   return mongo.rxFind("users", new JsonObject());
  }).subscribe(results -> {
   System.out.println("Results " + results);
  }, error -> {
   System.out.println("Err");
   error.printStackTrace();
  });
 }
}

代码示例来源:origin: PipelineAI/pipeline

return (Observable) res;
} else if (res instanceof Single) {
  return ((Single) res).toObservable();
} else if (res instanceof Completable) {
  return ((Completable) res).toObservable();

代码示例来源:origin: Netflix/EVCache

fbClient -> getData(fbClients.indexOf(fbClient), fbClients.size(), fbClient, canonicalKey, tc, throwEx, throwExc, false, scheduler) //TODO : for the last one make sure to pass throwExc
.doOnSuccess(fbData -> increment(fbClient.getServerGroupName(), _cacheName, "RETRY_" + ((fbData == null) ? "MISS" : "HIT")))
.toObservable()))
.firstOrDefault(null, fbData -> (fbData != null)).toSingle();

代码示例来源:origin: Netflix/EVCache

fbClient -> getData(fbClients.indexOf(fbClient), fbClients.size(), fbClient, canonicalKey, tc, throwEx, throwExc, false, scheduler) //TODO : for the last one make sure to pass throwExc
.doOnSuccess(fbData -> increment(fbClient.getServerGroupName(), _cacheName, "RETRY_" + ((fbData == null) ? "MISS" : "HIT")))
.toObservable()))
.firstOrDefault(null, fbData -> (fbData != null)).toSingle();

代码示例来源:origin: io.lettuce/lettuce-core

@Override
  public Observable<?> apply(Single<?> source) {
    return source.toObservable();
  }
}

代码示例来源:origin: apache/servicemix-bundles

@Nonnull
  @Override
  public Observable<?> convert(Single<?> source) {
    return source.toObservable();
  }
}

代码示例来源:origin: com.vmware.card-connectors/connectors-common

private void initSingle(Single<T> single) {
    Assert.notNull(single, "single can not be null");
    new DeferredResultSubscriber<>(single.toObservable(), this);
  }
}

代码示例来源:origin: quebic-source/microservices-sample-project

public SingleDeferredResult(
      Long timeout
      , Object timeoutResult
      , Single<T> single
      , MultiValueMap<String, String> headers
      , HttpStatus status) {
    super(timeout, timeoutResult);
    new DeferredResultWriter<T>(single.toObservable(), this, headers, status);
  }
}

代码示例来源:origin: FranRiadigos/InterviewTest

/**
 * We leave the repository the responsibility of the request, and once we get the List of items
 * we just return the number of them.
 *
 * @param params The Params.
 * @return Observable of the number of Comments
 */
@Override
public Observable<Integer> provideObservable(Params params) {
  if(params == null) return Observable.error(new NullParameterException(Params.class));
  return this.postRepository.getComments(params.postId).toObservable().map(List::size);
}

代码示例来源:origin: sczyh30/vertx-blueprint-microservice

@Override
 public void stop(Future<Void> future) throws Exception {
  // TODO: to optimize.
  Observable.from(registeredRecords)
   .flatMap(record -> discovery.rxUnpublish(record.getRegistration()).toObservable())
   .reduce((Void) null, (a, b) -> null)
   .subscribe(future::complete, future::fail);
 }
}

代码示例来源:origin: com.couchbase.client/core-io

@Override
public Observable<EndpointHealth> diagnostics() {
  List<Observable<EndpointHealth>> diags = new ArrayList<Observable<EndpointHealth>>();
  for (Endpoint endpoint : endpoints()) {
    diags.add(endpoint.diagnostics(type()).toObservable());
  }
  return Observable.merge(diags);
}

代码示例来源:origin: com.couchbase.client/core-io

@Override
public Observable<EndpointHealth> diagnostics() {
  List<Observable<EndpointHealth>> diags = new ArrayList<Observable<EndpointHealth>>();
  for (Endpoint endpoint : endpoints()) {
    diags.add(endpoint.diagnostics(type()).toObservable());
  }
  return Observable.merge(diags);
}

代码示例来源:origin: couchbase/couchbase-jvm-core

@Override
public Observable<EndpointHealth> diagnostics() {
  List<Observable<EndpointHealth>> diags = new ArrayList<Observable<EndpointHealth>>();
  for (Endpoint endpoint : endpoints()) {
    diags.add(endpoint.diagnostics(type()).toObservable());
  }
  return Observable.merge(diags);
}

代码示例来源:origin: jacek-marchwicki/JavaWebsocketClient

@Override
  public Observable<?> call(RxObjectEventConnected rxEventConn) {
    return RxMoreObservables.sendObjectMessage(rxEventConn.sender(), new RegisterMessage("asdf"))
        .toObservable();
  }
})

代码示例来源:origin: com.microsoft.azure/azure-cosmosdb-gateway

static public <T> Observable<T> inlineIfPossibleAsObs(Func0<Observable<T>> function, IRetryPolicy retryPolicy) {

    if (retryPolicy == null) {
      // shortcut
      return Observable.defer(() -> {
        return function.call();
      });

    } else {
      return BackoffRetryUtility.executeRetry(() -> function.call().toSingle(), retryPolicy).toObservable();
    }
  }
}

代码示例来源:origin: nurkiewicz/rxjava-book-examples

@Test
public void sample_138() throws Exception {
  Single<Integer> ignored = Single
      .just(1)
      .toObservable()
      .ignoreElements()   //PROBLEM
      .toSingle();
}

代码示例来源:origin: akarnokd/akarnokd-misc

@Test
public void test() throws Exception {
  Observable.fromCallable(() -> { throw new IOException(); })
  .toSingle()
  .subscribeOn(Schedulers.computation())
  .toObservable()
  .toSingle()
  .onErrorResumeNext(v -> Single.just(1))
  .subscribe(System.out::println, Throwable::printStackTrace);
  Thread.sleep(1000);
}

相关文章