io.reactivex.Single.doAfterTerminate()方法的使用及代码示例

x33g5p2x  于2022-01-29 转载在 其他  
字(8.3k)|赞(0)|评价(0)|浏览(139)

本文整理了Java中io.reactivex.Single.doAfterTerminate()方法的一些代码示例,展示了Single.doAfterTerminate()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Single.doAfterTerminate()方法的具体详情如下:
包路径:io.reactivex.Single
类名称:Single
方法名:doAfterTerminate

Single.doAfterTerminate介绍

[英]Registers an Action to be called after this Single invokes either onSuccess or onError. *

Note that the doAfterTerminate action is shared between subscriptions and as such should be thread-safe.

Scheduler: doAfterTerminate does not operate by default on a particular Scheduler.

History: 2.0.6 - experimental
[中]在调用onSuccess或onError之后注册要调用的操作。*
请注意,doAfterTerminate操作在订阅之间共享,因此应该是线程安全的。
Scheduler:doAfterTerminate默认情况下不会在特定的计划程序上运行。
历史:2.0.6-实验性

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Override
  public SingleSource<Integer> apply(Single<Integer> m) throws Exception {
    return m.doAfterTerminate(afterTerminate);
  }
});

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void afterTerminateActionNull() {
  Single.just(1).doAfterTerminate(null);
}

代码示例来源:origin: vert-x3/vertx-examples

@Override
 public void start() throws Exception {

  JsonObject config = new JsonObject().put("url", "jdbc:hsqldb:mem:test?shutdown=true")
   .put("driver_class", "org.hsqldb.jdbcDriver");

  JDBCClient jdbc = JDBCClient.createShared(vertx, config);

  // Connect to the database
  jdbc.rxGetConnection().flatMap(conn -> {

   // Now chain some statements using flatmap composition
   Single<ResultSet> resa = conn.rxUpdate("CREATE TABLE test(col VARCHAR(20))")
    .flatMap(result -> conn.rxUpdate("INSERT INTO test (col) VALUES ('val1')"))
    .flatMap(result -> conn.rxUpdate("INSERT INTO test (col) VALUES ('val2')"))
    .flatMap(result -> conn.rxQuery("SELECT * FROM test"));

   return resa.doAfterTerminate(conn::close);

  }).subscribe(resultSet -> {
   // Subscribe to the final result
   System.out.println("Results : " + resultSet.getRows());
  }, err -> {
   System.out.println("Database problem");
   err.printStackTrace();
  });
 }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void dispose() {
  TestHelper.checkDisposed(PublishSubject.<Integer>create().singleOrError().doAfterTerminate(afterTerminate));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void just() {
  Single.just(1)
  .doAfterTerminate(afterTerminate)
  .subscribeWith(to)
  .assertResult(1);
  assertAfterTerminateCalledOnce();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void error() {
  Single.<Integer>error(new TestException())
  .doAfterTerminate(afterTerminate)
  .subscribeWith(to)
  .assertFailure(TestException.class);
  assertAfterTerminateCalledOnce();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void justConditional() {
  Single.just(1)
  .doAfterTerminate(afterTerminate)
  .filter(Functions.alwaysTrue())
  .subscribeWith(to)
  .assertResult(1);
  assertAfterTerminateCalledOnce();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void errorConditional() {
  Single.<Integer>error(new TestException())
  .doAfterTerminate(afterTerminate)
  .filter(Functions.alwaysTrue())
  .subscribeWith(to)
  .assertFailure(TestException.class);
  assertAfterTerminateCalledOnce();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void actionThrows() {
  List<Throwable> errors = TestHelper.trackPluginErrors();
  try {
    Single.just(1)
    .doAfterTerminate(new Action() {
      @Override
      public void run() throws Exception {
        throw new TestException();
      }
    })
    .test()
    .assertResult(1);
    TestHelper.assertUndeliverable(errors, 0, TestException.class);
  } finally {
    RxJavaPlugins.reset();
  }
}

代码示例来源:origin: io.github.jklingsporn/vertx-jooq-async-rx

private <R> io.reactivex.functions.Function<SQLConnection, Single<? extends  R>> executeAndClose(Function<SQLConnection, Single<? extends R>> func) {
  return sqlConnection -> func.apply(sqlConnection).doAfterTerminate(sqlConnection::close);
}

代码示例来源:origin: jklingsporn/vertx-jooq

protected <R> io.reactivex.functions.Function<io.vertx.reactivex.ext.sql.SQLConnection, Single<? extends  R>> executeAndClose(Function<io.vertx.reactivex.ext.sql.SQLConnection, Single<? extends R>> func) {
  return sqlConnection -> func.apply(sqlConnection).doAfterTerminate(sqlConnection::close);
}

代码示例来源:origin: jklingsporn/vertx-jooq-async

private <R> io.reactivex.functions.Function<SQLConnection, Single<? extends  R>> executeAndClose(Function<SQLConnection, Single<? extends R>> func) {
  return sqlConnection -> func.apply(sqlConnection).doAfterTerminate(sqlConnection::close);
}

代码示例来源:origin: cescoffier/vertx-kubernetes-workshop

private void storeInDatabase(JsonObject operation) {
  // 1. need to retrieve a connection
  // 2. execute the insertion statement
  // 3. close the connection
  // Step 1 get the connection
  Single<SQLConnection> connectionRetrieved = jdbc.rxGetConnection();
  // Step 2, when the connection is retrieved (this may have failed), do the insertion (upon success)
  Single<UpdateResult> update = connectionRetrieved
    .flatMap(connection ->
      connection.rxUpdateWithParams(INSERT_STATEMENT, new JsonArray().add(operation.encode()))
        // Step 3, when the insertion is done, close the connection.
        .doAfterTerminate(connection::close));
  update.subscribe(result -> {
    // Ok
  }, err -> {
    System.err.println("Failed to insert operation in database: " + err);
  });
}

代码示例来源:origin: FroMage/redpipe

private Single<String> get(Vertx vertx, URI uri){
  WebClient client = WebClient.create(vertx);
  Single<HttpResponse<Buffer>> responseHandler = 
      client.get(uri.getPort(), uri.getHost(), uri.getPath()).rxSend();
  return responseHandler.map(response -> response.body().toString())
      .doAfterTerminate(() -> client.close());
}

代码示例来源:origin: FroMage/redpipe

@Path("8user")
@Produces("text/json")
@GET
public Single<DataClass> hello8User(@Context io.vertx.reactivex.core.Vertx rxVertx){
  System.err.println("Creating client");
  WebClientOptions options = new WebClientOptions();
  options.setSsl(true);
  options.setTrustAll(true);
  options.setVerifyHost(false);
  WebClient client = WebClient.create(rxVertx, options);
  Single<HttpResponse<io.vertx.reactivex.core.buffer.Buffer>> responseHandler = client.get(443,
      "www.google.com", 
      "/robots.txt").rxSend();
  System.err.println("Created client");
  return responseHandler.map(body -> {
    System.err.println("Got body");
    return new DataClass(body.body().toString());
  }).doAfterTerminate(() -> client.close());
}

代码示例来源:origin: FroMage/redpipe

@Path("8")
@GET
public Single<String> hello8(@Context io.vertx.reactivex.core.Vertx rxVertx){
  System.err.println("Creating client");
  WebClientOptions options = new WebClientOptions();
  options.setSsl(true);
  options.setTrustAll(true);
  options.setVerifyHost(false);
  WebClient client = WebClient.create(rxVertx, options);
  Single<HttpResponse<io.vertx.reactivex.core.buffer.Buffer>> responseHandler = client.get(443,
      "www.google.com", 
      "/robots.txt").rxSend();
  System.err.println("Created client");
  return responseHandler.map(body -> {
    System.err.println("Got body");
    return body.body().toString();
  }).doAfterTerminate(() -> client.close());
}

代码示例来源:origin: net.redpipe/redpipe-engine

public static <T> Single<T> doInConnection(Func1<? super SQLConnection, ? extends Single<T>> func){
  return Single.defer(() -> {
    Single<SQLConnection> connection = getConnection();
    return connection.flatMap(conn -> {
      return func.call(conn).doAfterTerminate(() -> {
        conn.close();
      });
    });
  });
}

代码示例来源:origin: FroMage/redpipe

public static <T> Single<T> doInConnection(Func1<? super SQLConnection, ? extends Single<T>> func){
  return Single.defer(() -> {
    Single<SQLConnection> connection = getConnection();
    return connection.flatMap(conn -> {
      return func.call(conn).doAfterTerminate(() -> {
        conn.close();
      });
    });
  });
}

代码示例来源:origin: tsegismont/vertx-musicstore

@Override
public void handle(RoutingContext rc) {
 dbClient.rxGetConnection().flatMap(sqlConnection -> {
  return findGenres(sqlConnection).doAfterTerminate(sqlConnection::close);
 }).flatMap(genres -> {
  rc.put("genres", genres);
  return templateEngine.rxRender(rc.data(), "templates/index");
 }).subscribe(rc.response()::end, rc::fail);
}

代码示例来源:origin: tsegismont/vertx-musicstore

private Maybe<Buffer> download(Long albumId) {
 return dbClient.rxGetConnection().flatMap(sqlConnection -> {
  return findAlbum(sqlConnection, albumId).doAfterTerminate(sqlConnection::close);
 }).flatMapMaybe(album -> {
  String mbAlbumId = album.getString("mbAlbumId");
  return mbAlbumId == null ? Maybe.empty() : Maybe.just(mbAlbumId);
 }).flatMap(mbAlbumId -> {
  return sendGetRequest(mbAlbumId).toMaybe();
 });
}

相关文章

微信公众号

最新文章

更多