rx.Single.flatMap()方法的使用及代码示例

x33g5p2x  于2022-01-29 转载在 其他  
字(10.6k)|赞(0)|评价(0)|浏览(176)

本文整理了Java中rx.Single.flatMap()方法的一些代码示例,展示了Single.flatMap()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Single.flatMap()方法的具体详情如下:
包路径:rx.Single
类名称:Single
方法名:flatMap

Single.flatMap介绍

暂无

代码示例

代码示例来源:origin: vert-x3/vertx-examples

@Override
 public void start() throws Exception {

  JsonObject config = new JsonObject().put("url", "jdbc:hsqldb:mem:test?shutdown=true")
   .put("driver_class", "org.hsqldb.jdbcDriver");

  JDBCClient jdbc = JDBCClient.createShared(vertx, config);

  jdbc
   .rxGetConnection() // Connect to the database
   .flatMapObservable(conn -> { // With the connection...
    return conn.rxUpdate("CREATE TABLE test(col VARCHAR(20))") // ...create test table
     .flatMap(result -> conn.rxUpdate("INSERT INTO test (col) VALUES ('val1')")) // ...insert a row
     .flatMap(result -> conn.rxUpdate("INSERT INTO test (col) VALUES ('val2')")) // ...another one
     .flatMap(result -> conn.rxQueryStream("SELECT * FROM test")) // ...get values stream
     .flatMapObservable(sqlRowStream -> {
      return sqlRowStream.toObservable() // Transform the stream into an Observable...
       .doOnTerminate(conn::close); // ...and close the connection when the stream is fully read or an error occurs
     });
   }).subscribe(row -> System.out.println("Row : " + row.encode()));
 }
}

代码示例来源:origin: vert-x3/vertx-examples

@Override
 public void start() throws Exception {

  JsonObject config = new JsonObject().put("url", "jdbc:hsqldb:mem:test?shutdown=true")
   .put("driver_class", "org.hsqldb.jdbcDriver");

  JDBCClient jdbc = JDBCClient.createShared(vertx, config);

  // Connect to the database
  jdbc.rxGetConnection().flatMap(conn -> {

   // Now chain some statements using flatmap composition
   Single<ResultSet> resa = conn.rxUpdate("CREATE TABLE test(col VARCHAR(20))")
    .flatMap(result -> conn.rxUpdate("INSERT INTO test (col) VALUES ('val1')"))
    .flatMap(result -> conn.rxUpdate("INSERT INTO test (col) VALUES ('val2')"))
    .flatMap(result -> conn.rxQuery("SELECT * FROM test"));

   return resa.doAfterTerminate(conn::close);

  }).subscribe(resultSet -> {
   // Subscribe to the final result
   System.out.println("Results : " + resultSet.getRows());
  }, err -> {
   System.out.println("Database problem");
   err.printStackTrace();
  });
 }
}

代码示例来源:origin: vert-x3/vertx-examples

.flatMap(conn ->
 conn
    .flatMap(updateResult -> conn.rxUpdateWithParams("INSERT INTO colors (name) VALUES (?)", new JsonArray().add("WHITE")))
    .flatMap(updateResult -> conn.rxUpdateWithParams("INSERT INTO colors (name) VALUES (?)", new JsonArray().add("PURPLE")))
  .flatMap(updateResult -> conn.rxQuery("SELECT * FROM colors"))

代码示例来源:origin: vert-x3/vertx-examples

private void insertAndFind() {
  // Documents to insert
  Observable<JsonObject> documents = Observable.just(
   new JsonObject().put("username", "temporalfox").put("firstname", "Julien").put("password", "bilto"),
   new JsonObject().put("username", "purplefox").put("firstname", "Tim").put("password", "wibble")
  );

  mongo.rxCreateCollection("users").flatMapObservable(v -> {
   // After collection is created we insert each document
   return documents.flatMap(doc -> mongo.rxInsert("users", doc).toObservable());
  }).doOnNext(id -> {
   System.out.println("Inserted document " + id);
  }).last().toSingle().flatMap(id -> {
   // Everything has been inserted now we can query mongo
   System.out.println("Insertions done");
   return mongo.rxFind("users", new JsonObject());
  }).subscribe(results -> {
   System.out.println("Results " + results);
  }, error -> {
   System.out.println("Err");
   error.printStackTrace();
  });
 }
}

代码示例来源:origin: Netflix/EVCache

.flatMap(metadataMap -> {
  if (metadataMap == null) return null;

代码示例来源:origin: Netflix/EVCache

return vals.flatMap(r -> {
  HashMap<String, T> returnVal = new HashMap<String, T>();
  for(Entry<String, Object> entry : r.entrySet()) {

代码示例来源:origin: Netflix/EVCache

private <T> Single<T> assembleChunks(String key, boolean touch, int ttl, Transcoder<T> tc, boolean hasZF, Scheduler scheduler) {
  final Stopwatch operationDuration = EVCacheMetricsFactory.getStatsTimer(appName, serverGroup, "LatencyChunk").start();
  return getChunkDetails(key, scheduler).flatMap(cd -> {
    if (cd == null) return Single.just(null);
    if (!cd.isChunked()) {

代码示例来源:origin: Netflix/EVCache

final boolean hasZF = hasZoneFallback();
boolean throwEx = hasZF ? false : throwExc;
return getData(client, canonicalKey, tc, throwEx, hasZF, scheduler).flatMap(data -> {
  if (data == null && hasZF) {
    final List<EVCacheClient> fbClients = _pool.getEVCacheClientsForReadExcluding(client.getServerGroup());

代码示例来源:origin: Netflix/EVCache

boolean throwEx = hasZF ? false : throwExc;
return getData(client, canonicalKey, tc, throwEx, hasZF, scheduler).flatMap(data -> {
  if (data == null && hasZF) {
    final List<EVCacheClient> fbClients = _pool.getEVCacheClientsForReadExcluding(client.getServerGroup());

代码示例来源:origin: Netflix/EVCache

if(ignoreTouch.get()) {
  final Single<Object> value = _client.asyncGet(hKey, evcacheValueTranscoder, null).get(readTimeout.get(), TimeUnit.MILLISECONDS, _throwException, hasZF, scheduler);
  return value.flatMap(r -> {
    final CASValue<Object> rObj = (CASValue<Object>)r;
    final EVCacheValue val = (EVCacheValue)rObj.getValue();
  final Single<CASValue<Object>> value = _client.asyncGetAndTouch(hKey, timeToLive, evcacheValueTranscoder).get(readTimeout.get(), TimeUnit.MILLISECONDS, _throwException, hasZF, scheduler);
  if(value != null ) {
    return value.flatMap(r -> {
      final CASValue<Object> rObj = (CASValue<Object>)r;
      final EVCacheValue val = (EVCacheValue)rObj.getValue();

代码示例来源:origin: io.vertx/vertx-rx-java

/**
 * Generates a {@link Single} from {@link SQLConnection} operations.
 *
 * @param client the {@link SQLClient}
 * @param sourceSupplier a user-provided function returning a {@link Single} generated by interacting with the given {@link SQLConnection}
 * @param <T> the type of the item emitted by the {@link Single}
 * @return a {@link Single} generated from {@link SQLConnection} operations
 */
public static <T> Single<T> usingConnectionSingle(SQLClient client, Function<SQLConnection, Single<T>> sourceSupplier) {
 return client.rxGetConnection().flatMap(conn -> {
  return sourceSupplier.apply(conn).doAfterTerminate(conn::close);
 });
}

代码示例来源:origin: vert-x3/vertx-rx

/**
 * Generates a {@link Single} from {@link SQLConnection} operations.
 *
 * @param client the {@link SQLClient}
 * @param sourceSupplier a user-provided function returning a {@link Single} generated by interacting with the given {@link SQLConnection}
 * @param <T> the type of the item emitted by the {@link Single}
 * @return a {@link Single} generated from {@link SQLConnection} operations
 */
public static <T> Single<T> usingConnectionSingle(SQLClient client, Function<SQLConnection, Single<T>> sourceSupplier) {
 return client.rxGetConnection().flatMap(conn -> {
  return sourceSupplier.apply(conn).doAfterTerminate(conn::close);
 });
}

代码示例来源:origin: hawkular/hawkular-metrics

private Completable doPostJobExecutionWithoutRescheduling(Completable job, JobDetailsImpl jobDetails,
    Date timeSlice, Set<UUID> activeJobs) {
  return job
      .toSingle(() -> new JobExecutionState(jobDetails, activeJobs))
      .flatMap(this::releaseJobExecutionLock)
      .flatMap(this::setJobFinished)
      .doOnError(t -> {
        logger.debug("There was an error during post-job execution, but the job has already been " +
            "rescheduled.", t);
        publishJobFinished(jobDetails);
      })
      .doOnSuccess(states -> publishJobFinished(states.currentDetails))
      .toCompletable();
}

代码示例来源:origin: sczyh30/vertx-blueprint-microservice

public CartEventDataSourceImpl(io.vertx.core.Vertx vertx, JsonObject json) {
 this.client = JDBCClient.createNonShared(Vertx.newInstance(vertx), json);
 // TODO: Should not init the table here.
 client.rxGetConnection()
  .flatMap(connection ->
   connection.rxExecute(INIT_STATEMENT)
    .doAfterTerminate(connection::close)
  )
  .subscribe();
}

代码示例来源:origin: sczyh30/vertx-blueprint-microservice

@Override
public Single<Void> save(CartEvent cartEvent) {
 JsonArray params = new JsonArray().add(cartEvent.getCartEventType().name())
  .add(cartEvent.getUserId())
  .add(cartEvent.getProductId())
  .add(cartEvent.getAmount())
  .add(cartEvent.getCreatedAt() > 0 ? cartEvent.getCreatedAt() : System.currentTimeMillis());
 return client.rxGetConnection()
  .flatMap(conn -> conn.rxUpdateWithParams(SAVE_STATEMENT, params)
   .map(r -> (Void) null)
   .doAfterTerminate(conn::close)
  );
}

代码示例来源:origin: nurkiewicz/rxjava-book-examples

@Test
public void sample_55() throws Exception {
  Single<String> example =
      fetch("http://www.example.com")
          .flatMap(this::body);
  String b = example.toBlocking().value();
}

代码示例来源:origin: vert-x3/vertx-rx

@Override
 public Single<T> call(Single<T> upstream) {
  return sqlConnection.rxSetAutoCommit(false).toCompletable()
   .andThen(upstream)
   .flatMap(item -> sqlConnection.rxCommit().toCompletable().andThen(Single.just(item)))
   .onErrorResumeNext(throwable -> {
    return sqlConnection.rxRollback().toCompletable().onErrorComplete()
     .andThen(sqlConnection.rxSetAutoCommit(true).toCompletable().onErrorComplete())
     .andThen(Single.error(throwable));
   }).flatMap(item -> sqlConnection.rxSetAutoCommit(true).toCompletable().andThen(Single.just(item)));
 }
}

代码示例来源:origin: io.vertx/vertx-rx-java

@Override
 public Single<T> call(Single<T> upstream) {
  return sqlConnection.rxSetAutoCommit(false).toCompletable()
   .andThen(upstream)
   .flatMap(item -> sqlConnection.rxCommit().toCompletable().andThen(Single.just(item)))
   .onErrorResumeNext(throwable -> {
    return sqlConnection.rxRollback().toCompletable().onErrorComplete()
     .andThen(sqlConnection.rxSetAutoCommit(true).toCompletable().onErrorComplete())
     .andThen(Single.error(throwable));
   }).flatMap(item -> sqlConnection.rxSetAutoCommit(true).toCompletable().andThen(Single.just(item)));
 }
}

代码示例来源:origin: vert-x3/vertx-rx

private Single<List<String>> inTransaction(Exception e) throws Exception {
  return client.rxGetConnection().flatMap(conn -> {
   return rxInsertExtraFolks(conn)
    .andThen(uniqueNames(conn))
    .<List<String>>collect(ArrayList::new, List::add).toSingle()
    .compose(upstream -> e == null ? upstream : upstream.flatMap(names -> Single.error(e)))
    .compose(SQLClientHelper.txSingleTransformer(conn))
    .flatMap(names -> rxAssertAutoCommit(conn).andThen(Single.just(names)))
    .doAfterTerminate(conn::close);
  });
 }
}

代码示例来源:origin: io.vertx/vertx-junit5

@Test
@DisplayName("Check the deployment and interaction of a Rx1 verticle")
void check_deployment_and_message_send(Vertx vertx, VertxTestContext testContext) {
 RxHelper
  .deployVerticle(vertx, new RxVerticle())
  .toSingle()
  .flatMap(id -> vertx.eventBus().rxSend("check", "Ok?"))
  .subscribe(
   message -> testContext.verify(() -> {
    assertThat(message.body()).isEqualTo("Check!");
    testContext.completeNow();
   }),
   testContext::failNow);
}

相关文章