rx.Single类的使用及代码示例

x33g5p2x  于2022-01-29 转载在 其他  
字(10.6k)|赞(0)|评价(0)|浏览(181)

本文整理了Java中rx.Single类的一些代码示例,展示了Single类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Single类的具体详情如下:
包路径:rx.Single
类名称:Single

Single介绍

暂无

代码示例

代码示例来源:origin: vert-x3/vertx-examples

private void insertAndFind() {
  // Documents to insert
  Observable<JsonObject> documents = Observable.just(
   new JsonObject().put("username", "temporalfox").put("firstname", "Julien").put("password", "bilto"),
   new JsonObject().put("username", "purplefox").put("firstname", "Tim").put("password", "wibble")
  );

  mongo.rxCreateCollection("users").flatMapObservable(v -> {
   // After collection is created we insert each document
   return documents.flatMap(doc -> mongo.rxInsert("users", doc).toObservable());
  }).doOnNext(id -> {
   System.out.println("Inserted document " + id);
  }).last().toSingle().flatMap(id -> {
   // Everything has been inserted now we can query mongo
   System.out.println("Insertions done");
   return mongo.rxFind("users", new JsonObject());
  }).subscribe(results -> {
   System.out.println("Results " + results);
  }, error -> {
   System.out.println("Err");
   error.printStackTrace();
  });
 }
}

代码示例来源:origin: Netflix/EVCache

public <T> Single<T> get(String key, Transcoder<T> tc, Scheduler scheduler) {
  if (null == key) return Single.error(new IllegalArgumentException("Key cannot be null"));
  if (client == null) {
    increment("NULL_CLIENT");
    return Single.error(new EVCacheException("Could not find a client to get the data APP " + _appName));
      if (shouldThrottle(event)) {
        increment("THROTTLED");
        return Single.error(new EVCacheException("Request Throttled for app " + _appName + " & key " + key));
  final boolean hasZF = hasZoneFallback();
  boolean throwEx = hasZF ? false : throwExc;
  return getData(client, canonicalKey, tc, throwEx, hasZF, scheduler).flatMap(data -> {
    if (data == null && hasZF) {
      final List<EVCacheClient> fbClients = _pool.getEVCacheClientsForReadExcluding(client.getServerGroup());
      if (fbClients != null && !fbClients.isEmpty()) {
        return Observable.concat(Observable.from(fbClients).map(
            fbClient -> getData(fbClients.indexOf(fbClient), fbClients.size(), fbClient, canonicalKey, tc, throwEx, throwExc, false, scheduler) //TODO : for the last one make sure to pass throwExc
            .doOnSuccess(fbData -> increment(fbClient.getServerGroupName(), _cacheName, "RETRY_" + ((fbData == null) ? "MISS" : "HIT")))
            .toObservable()))
    return Single.just(data);
  }).map(data -> {
    if (data != null) {
      stats.cacheHit(Call.GET);
    if (event != null) endEvent(event);
    return data;
  }).onErrorReturn(ex -> {

代码示例来源:origin: apollographql/apollo-android

/**
 * Converts an {@link ApolloStoreOperation} to a Single.
 *
 * @param operation the ApolloStoreOperation to convert
 * @param <T>       the value type
 * @return the converted Single
 */
@NotNull public static <T> Single<T> from(@NotNull final ApolloStoreOperation<T> operation) {
 checkNotNull(operation, "operation == null");
 return Single.create(new Single.OnSubscribe<T>() {
  @Override
  public void call(final SingleSubscriber<? super T> subscriber) {
   operation.enqueue(new ApolloStoreOperation.Callback<T>() {
    @Override public void onSuccess(T result) {
     subscriber.onSuccess(result);
    }
    @Override public void onFailure(Throwable t) {
     subscriber.onError(t);
    }
   });
  }
 });
}

代码示例来源:origin: spring-projects/spring-framework

@PostMapping("/single")
public Completable createWithSingle(@RequestBody Single<Person> single) {
  return single.map(persons::add).toCompletable();
}

代码示例来源:origin: vert-x3/vertx-examples

@Override
 public void start() throws Exception {

  JsonObject config = new JsonObject().put("url", "jdbc:hsqldb:mem:test?shutdown=true")
   .put("driver_class", "org.hsqldb.jdbcDriver");

  JDBCClient jdbc = JDBCClient.createShared(vertx, config);

  // Connect to the database
  jdbc.rxGetConnection().flatMap(conn -> {

   // Now chain some statements using flatmap composition
   Single<ResultSet> resa = conn.rxUpdate("CREATE TABLE test(col VARCHAR(20))")
    .flatMap(result -> conn.rxUpdate("INSERT INTO test (col) VALUES ('val1')"))
    .flatMap(result -> conn.rxUpdate("INSERT INTO test (col) VALUES ('val2')"))
    .flatMap(result -> conn.rxQuery("SELECT * FROM test"));

   return resa.doAfterTerminate(conn::close);

  }).subscribe(resultSet -> {
   // Subscribe to the final result
   System.out.println("Results : " + resultSet.getRows());
  }, err -> {
   System.out.println("Database problem");
   err.printStackTrace();
  });
 }
}

代码示例来源:origin: sczyh30/vertx-blueprint-microservice

@Override
 public void stop(Future<Void> future) throws Exception {
  // TODO: to optimize.
  Observable.from(registeredRecords)
   .flatMap(record -> discovery.rxUnpublish(record.getRegistration()).toObservable())
   .reduce((Void) null, (a, b) -> null)
   .subscribe(future::complete, future::fail);
 }
}

代码示例来源:origin: vert-x3/vertx-examples

@Override
  public void start() throws Exception {

    EventBus eb = vertx.eventBus();

    eb.consumer(ADDRESS)
        .toObservable()
        .subscribe(message -> {
          System.out.println("Received " + message.body());
          message.reply("PONG");
        });

    // Send a message every second
    vertx.setPeriodic(1000, v -> {
      eb.rxSend(ADDRESS, "PING")
          .subscribe(reply -> {
            System.out.println("Received reply " + reply.body());
          });
    });
  }
}

代码示例来源:origin: vert-x3/vertx-examples

@Override
 public void start() throws Exception {

  JsonObject config = new JsonObject().put("url", "jdbc:hsqldb:mem:test?shutdown=true")
   .put("driver_class", "org.hsqldb.jdbcDriver");

  JDBCClient jdbc = JDBCClient.createShared(vertx, config);

  jdbc
   .rxGetConnection() // Connect to the database
   .flatMapObservable(conn -> { // With the connection...
    return conn.rxUpdate("CREATE TABLE test(col VARCHAR(20))") // ...create test table
     .flatMap(result -> conn.rxUpdate("INSERT INTO test (col) VALUES ('val1')")) // ...insert a row
     .flatMap(result -> conn.rxUpdate("INSERT INTO test (col) VALUES ('val2')")) // ...another one
     .flatMap(result -> conn.rxQueryStream("SELECT * FROM test")) // ...get values stream
     .flatMapObservable(sqlRowStream -> {
      return sqlRowStream.toObservable() // Transform the stream into an Observable...
       .doOnTerminate(conn::close); // ...and close the connection when the stream is fully read or an error occurs
     });
   }).subscribe(row -> System.out.println("Row : " + row.encode()));
 }
}

代码示例来源:origin: filippella/Dagger-Rx-Database-MVP

protected <T> void subscribe(Observable<T> observable, Observer<T> observer) {
    observable.subscribeOn(Schedulers.newThread())
        .toSingle()
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(observer);
  }
}

代码示例来源:origin: sczyh30/vertx-blueprint-microservice

@Override
public Observable<CartEvent> streamByUser(String userId) {
 JsonArray params = new JsonArray().add(userId).add(userId);
 return client.rxGetConnection()
  .flatMapObservable(conn ->
   conn.rxQueryWithParams(STREAM_STATEMENT, params)
    .map(ResultSet::getRows)
    .flatMapObservable(Observable::from)
    .map(this::wrapCartEvent)
    .doOnTerminate(conn::close)
  );
}

代码示例来源:origin: PipelineAI/pipeline

return (Observable) res;
} else if (res instanceof Single) {
  return ((Single) res).toObservable();
} else if (res instanceof Completable) {
  return ((Completable) res).toObservable();
} else {
  return Observable.just(res);

代码示例来源:origin: com.microsoft.azure/azure-cosmosdb-gateway

private Single<DatabaseAccount> getDatabaseAccountAsync(URL serviceEndpoint) {
  try {
    return this.owner.getDatabaseAccountFromEndpoint(serviceEndpoint.toURI())
        .doOnNext(i -> logger.debug("account retrieved: {}", i)).toSingle();
  } catch (URISyntaxException e) {
    return Single.error(e);
  }
}

代码示例来源:origin: akarnokd/akarnokd-misc

@Test
public void test() throws Exception {
  Observable.fromCallable(() -> { throw new IOException(); })
  .toSingle()
  .subscribeOn(Schedulers.computation())
  .toObservable()
  .toSingle()
  .onErrorResumeNext(v -> Single.just(1))
  .subscribe(System.out::println, Throwable::printStackTrace);
  Thread.sleep(1000);
}

代码示例来源:origin: georocket/georocket

/**
 * Determine the sizes of all given files
 * @param files the files
 * @param vertx the Vert.x instance
 * @return an observable that emits pairs of file names and sizes
 */
private Observable<Pair<String, Long>> getFileSizes(List<String> files, Vertx vertx) {
 FileSystem fs = vertx.fileSystem();
 return Observable.from(files)
  .flatMapSingle(path -> fs.rxProps(path).map(props -> Pair.of(path, props.size())));
}

代码示例来源:origin: vert-x3/vertx-rx

private Single<List<String>> inTransaction(Exception e) throws Exception {
  return client.rxGetConnection().flatMap(conn -> {
   return rxInsertExtraFolks(conn)
    .andThen(uniqueNames(conn))
    .<List<String>>collect(ArrayList::new, List::add).toSingle()
    .compose(upstream -> e == null ? upstream : upstream.flatMap(names -> Single.error(e)))
    .compose(SQLClientHelper.txSingleTransformer(conn))
    .flatMap(names -> rxAssertAutoCommit(conn).andThen(Single.just(names)))
    .doAfterTerminate(conn::close);
  });
 }
}

代码示例来源:origin: com.couchbase.client/core-io

@Override
public Observable<EndpointHealth> diagnostics() {
  List<Observable<EndpointHealth>> diags = new ArrayList<Observable<EndpointHealth>>();
  for (Endpoint endpoint : endpoints()) {
    diags.add(endpoint.diagnostics(type()).toObservable());
  }
  return Observable.merge(diags);
}

代码示例来源:origin: vert-x3/vertx-examples

@Override
 public void start() throws Exception {
  WebClient client = WebClient.create(vertx);
  Single<HttpResponse<String>> request = client.get(8080, "localhost", "/")
   .as(BodyCodec.string())
   .rxSend();

  // Fire the request
  request.subscribe(resp -> System.out.println("Server content " + resp.body()));

  // Again
  request.subscribe(resp -> System.out.println("Server content " + resp.body()));

  // And again
  request.subscribe(resp -> System.out.println("Server content " + resp.body()));
 }
}

代码示例来源:origin: com.microsoft.azure/azure-cosmosdb-gateway

static public <T> Observable<T> inlineIfPossibleAsObs(Func0<Observable<T>> function, IRetryPolicy retryPolicy) {

    if (retryPolicy == null) {
      // shortcut
      return Observable.defer(() -> {
        return function.call();
      });

    } else {
      return BackoffRetryUtility.executeRetry(() -> function.call().toSingle(), retryPolicy).toObservable();
    }
  }
}

代码示例来源:origin: nurkiewicz/rxjava-book-examples

@Test
public void sample_138() throws Exception {
  Single<Integer> ignored = Single
      .just(1)
      .toObservable()
      .ignoreElements()   //PROBLEM
      .toSingle();
}

代码示例来源:origin: ReactiveX/RxNetty

public SingleHostConnectionProvider(Observable<HostConnector<W, R>> connectors) {
  connectors.toSingle()
       .subscribe(new Action1<HostConnector<W, R>>() {
         @Override
         public void call(HostConnector<W, R> connector) {
           provider = connector.getConnectionProvider();
         }
       }, new Action1<Throwable>() {
         @Override
         public void call(Throwable t) {
           logger.error("Failed while fetching a host connector from a scalar host source", t);
         }
       });
}

相关文章