本文整理了Java中rx.Single
类的一些代码示例,展示了Single
类的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Single
类的具体详情如下:
包路径:rx.Single
类名称:Single
暂无
代码示例来源:origin: vert-x3/vertx-examples
private void insertAndFind() {
// Documents to insert
Observable<JsonObject> documents = Observable.just(
new JsonObject().put("username", "temporalfox").put("firstname", "Julien").put("password", "bilto"),
new JsonObject().put("username", "purplefox").put("firstname", "Tim").put("password", "wibble")
);
mongo.rxCreateCollection("users").flatMapObservable(v -> {
// After collection is created we insert each document
return documents.flatMap(doc -> mongo.rxInsert("users", doc).toObservable());
}).doOnNext(id -> {
System.out.println("Inserted document " + id);
}).last().toSingle().flatMap(id -> {
// Everything has been inserted now we can query mongo
System.out.println("Insertions done");
return mongo.rxFind("users", new JsonObject());
}).subscribe(results -> {
System.out.println("Results " + results);
}, error -> {
System.out.println("Err");
error.printStackTrace();
});
}
}
代码示例来源:origin: Netflix/EVCache
public <T> Single<T> get(String key, Transcoder<T> tc, Scheduler scheduler) {
if (null == key) return Single.error(new IllegalArgumentException("Key cannot be null"));
if (client == null) {
increment("NULL_CLIENT");
return Single.error(new EVCacheException("Could not find a client to get the data APP " + _appName));
if (shouldThrottle(event)) {
increment("THROTTLED");
return Single.error(new EVCacheException("Request Throttled for app " + _appName + " & key " + key));
final boolean hasZF = hasZoneFallback();
boolean throwEx = hasZF ? false : throwExc;
return getData(client, canonicalKey, tc, throwEx, hasZF, scheduler).flatMap(data -> {
if (data == null && hasZF) {
final List<EVCacheClient> fbClients = _pool.getEVCacheClientsForReadExcluding(client.getServerGroup());
if (fbClients != null && !fbClients.isEmpty()) {
return Observable.concat(Observable.from(fbClients).map(
fbClient -> getData(fbClients.indexOf(fbClient), fbClients.size(), fbClient, canonicalKey, tc, throwEx, throwExc, false, scheduler) //TODO : for the last one make sure to pass throwExc
.doOnSuccess(fbData -> increment(fbClient.getServerGroupName(), _cacheName, "RETRY_" + ((fbData == null) ? "MISS" : "HIT")))
.toObservable()))
return Single.just(data);
}).map(data -> {
if (data != null) {
stats.cacheHit(Call.GET);
if (event != null) endEvent(event);
return data;
}).onErrorReturn(ex -> {
代码示例来源:origin: apollographql/apollo-android
/**
* Converts an {@link ApolloStoreOperation} to a Single.
*
* @param operation the ApolloStoreOperation to convert
* @param <T> the value type
* @return the converted Single
*/
@NotNull public static <T> Single<T> from(@NotNull final ApolloStoreOperation<T> operation) {
checkNotNull(operation, "operation == null");
return Single.create(new Single.OnSubscribe<T>() {
@Override
public void call(final SingleSubscriber<? super T> subscriber) {
operation.enqueue(new ApolloStoreOperation.Callback<T>() {
@Override public void onSuccess(T result) {
subscriber.onSuccess(result);
}
@Override public void onFailure(Throwable t) {
subscriber.onError(t);
}
});
}
});
}
代码示例来源:origin: spring-projects/spring-framework
@PostMapping("/single")
public Completable createWithSingle(@RequestBody Single<Person> single) {
return single.map(persons::add).toCompletable();
}
代码示例来源:origin: vert-x3/vertx-examples
@Override
public void start() throws Exception {
JsonObject config = new JsonObject().put("url", "jdbc:hsqldb:mem:test?shutdown=true")
.put("driver_class", "org.hsqldb.jdbcDriver");
JDBCClient jdbc = JDBCClient.createShared(vertx, config);
// Connect to the database
jdbc.rxGetConnection().flatMap(conn -> {
// Now chain some statements using flatmap composition
Single<ResultSet> resa = conn.rxUpdate("CREATE TABLE test(col VARCHAR(20))")
.flatMap(result -> conn.rxUpdate("INSERT INTO test (col) VALUES ('val1')"))
.flatMap(result -> conn.rxUpdate("INSERT INTO test (col) VALUES ('val2')"))
.flatMap(result -> conn.rxQuery("SELECT * FROM test"));
return resa.doAfterTerminate(conn::close);
}).subscribe(resultSet -> {
// Subscribe to the final result
System.out.println("Results : " + resultSet.getRows());
}, err -> {
System.out.println("Database problem");
err.printStackTrace();
});
}
}
代码示例来源:origin: sczyh30/vertx-blueprint-microservice
@Override
public void stop(Future<Void> future) throws Exception {
// TODO: to optimize.
Observable.from(registeredRecords)
.flatMap(record -> discovery.rxUnpublish(record.getRegistration()).toObservable())
.reduce((Void) null, (a, b) -> null)
.subscribe(future::complete, future::fail);
}
}
代码示例来源:origin: vert-x3/vertx-examples
@Override
public void start() throws Exception {
EventBus eb = vertx.eventBus();
eb.consumer(ADDRESS)
.toObservable()
.subscribe(message -> {
System.out.println("Received " + message.body());
message.reply("PONG");
});
// Send a message every second
vertx.setPeriodic(1000, v -> {
eb.rxSend(ADDRESS, "PING")
.subscribe(reply -> {
System.out.println("Received reply " + reply.body());
});
});
}
}
代码示例来源:origin: vert-x3/vertx-examples
@Override
public void start() throws Exception {
JsonObject config = new JsonObject().put("url", "jdbc:hsqldb:mem:test?shutdown=true")
.put("driver_class", "org.hsqldb.jdbcDriver");
JDBCClient jdbc = JDBCClient.createShared(vertx, config);
jdbc
.rxGetConnection() // Connect to the database
.flatMapObservable(conn -> { // With the connection...
return conn.rxUpdate("CREATE TABLE test(col VARCHAR(20))") // ...create test table
.flatMap(result -> conn.rxUpdate("INSERT INTO test (col) VALUES ('val1')")) // ...insert a row
.flatMap(result -> conn.rxUpdate("INSERT INTO test (col) VALUES ('val2')")) // ...another one
.flatMap(result -> conn.rxQueryStream("SELECT * FROM test")) // ...get values stream
.flatMapObservable(sqlRowStream -> {
return sqlRowStream.toObservable() // Transform the stream into an Observable...
.doOnTerminate(conn::close); // ...and close the connection when the stream is fully read or an error occurs
});
}).subscribe(row -> System.out.println("Row : " + row.encode()));
}
}
代码示例来源:origin: filippella/Dagger-Rx-Database-MVP
protected <T> void subscribe(Observable<T> observable, Observer<T> observer) {
observable.subscribeOn(Schedulers.newThread())
.toSingle()
.observeOn(AndroidSchedulers.mainThread())
.subscribe(observer);
}
}
代码示例来源:origin: sczyh30/vertx-blueprint-microservice
@Override
public Observable<CartEvent> streamByUser(String userId) {
JsonArray params = new JsonArray().add(userId).add(userId);
return client.rxGetConnection()
.flatMapObservable(conn ->
conn.rxQueryWithParams(STREAM_STATEMENT, params)
.map(ResultSet::getRows)
.flatMapObservable(Observable::from)
.map(this::wrapCartEvent)
.doOnTerminate(conn::close)
);
}
代码示例来源:origin: PipelineAI/pipeline
return (Observable) res;
} else if (res instanceof Single) {
return ((Single) res).toObservable();
} else if (res instanceof Completable) {
return ((Completable) res).toObservable();
} else {
return Observable.just(res);
代码示例来源:origin: com.microsoft.azure/azure-cosmosdb-gateway
private Single<DatabaseAccount> getDatabaseAccountAsync(URL serviceEndpoint) {
try {
return this.owner.getDatabaseAccountFromEndpoint(serviceEndpoint.toURI())
.doOnNext(i -> logger.debug("account retrieved: {}", i)).toSingle();
} catch (URISyntaxException e) {
return Single.error(e);
}
}
代码示例来源:origin: akarnokd/akarnokd-misc
@Test
public void test() throws Exception {
Observable.fromCallable(() -> { throw new IOException(); })
.toSingle()
.subscribeOn(Schedulers.computation())
.toObservable()
.toSingle()
.onErrorResumeNext(v -> Single.just(1))
.subscribe(System.out::println, Throwable::printStackTrace);
Thread.sleep(1000);
}
代码示例来源:origin: georocket/georocket
/**
* Determine the sizes of all given files
* @param files the files
* @param vertx the Vert.x instance
* @return an observable that emits pairs of file names and sizes
*/
private Observable<Pair<String, Long>> getFileSizes(List<String> files, Vertx vertx) {
FileSystem fs = vertx.fileSystem();
return Observable.from(files)
.flatMapSingle(path -> fs.rxProps(path).map(props -> Pair.of(path, props.size())));
}
代码示例来源:origin: vert-x3/vertx-rx
private Single<List<String>> inTransaction(Exception e) throws Exception {
return client.rxGetConnection().flatMap(conn -> {
return rxInsertExtraFolks(conn)
.andThen(uniqueNames(conn))
.<List<String>>collect(ArrayList::new, List::add).toSingle()
.compose(upstream -> e == null ? upstream : upstream.flatMap(names -> Single.error(e)))
.compose(SQLClientHelper.txSingleTransformer(conn))
.flatMap(names -> rxAssertAutoCommit(conn).andThen(Single.just(names)))
.doAfterTerminate(conn::close);
});
}
}
代码示例来源:origin: com.couchbase.client/core-io
@Override
public Observable<EndpointHealth> diagnostics() {
List<Observable<EndpointHealth>> diags = new ArrayList<Observable<EndpointHealth>>();
for (Endpoint endpoint : endpoints()) {
diags.add(endpoint.diagnostics(type()).toObservable());
}
return Observable.merge(diags);
}
代码示例来源:origin: vert-x3/vertx-examples
@Override
public void start() throws Exception {
WebClient client = WebClient.create(vertx);
Single<HttpResponse<String>> request = client.get(8080, "localhost", "/")
.as(BodyCodec.string())
.rxSend();
// Fire the request
request.subscribe(resp -> System.out.println("Server content " + resp.body()));
// Again
request.subscribe(resp -> System.out.println("Server content " + resp.body()));
// And again
request.subscribe(resp -> System.out.println("Server content " + resp.body()));
}
}
代码示例来源:origin: com.microsoft.azure/azure-cosmosdb-gateway
static public <T> Observable<T> inlineIfPossibleAsObs(Func0<Observable<T>> function, IRetryPolicy retryPolicy) {
if (retryPolicy == null) {
// shortcut
return Observable.defer(() -> {
return function.call();
});
} else {
return BackoffRetryUtility.executeRetry(() -> function.call().toSingle(), retryPolicy).toObservable();
}
}
}
代码示例来源:origin: nurkiewicz/rxjava-book-examples
@Test
public void sample_138() throws Exception {
Single<Integer> ignored = Single
.just(1)
.toObservable()
.ignoreElements() //PROBLEM
.toSingle();
}
代码示例来源:origin: ReactiveX/RxNetty
public SingleHostConnectionProvider(Observable<HostConnector<W, R>> connectors) {
connectors.toSingle()
.subscribe(new Action1<HostConnector<W, R>>() {
@Override
public void call(HostConnector<W, R> connector) {
provider = connector.getConnectionProvider();
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable t) {
logger.error("Failed while fetching a host connector from a scalar host source", t);
}
});
}
内容来源于网络,如有侵权,请联系作者删除!