本文整理了Java中io.reactivex.Single.ignoreElement()
方法的一些代码示例,展示了Single.ignoreElement()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Single.ignoreElement()
方法的具体详情如下:
包路径:io.reactivex.Single
类名称:Single
方法名:ignoreElement
[英]Returns a Completable that ignores the success value of this Singleand calls onComplete instead on the returned Completable.
Scheduler: ignoreElement does not operate by default on a particular Scheduler.
[中]返回一个Completable,该Completable忽略此single的success值,并对返回的Completable调用onComplete。
调度器:默认情况下,ignoreElement不会在特定的调度器上运行。
代码示例来源:origin: ReactiveX/RxJava
@Test
public void ignoreElement() {
Single.just(1)
.ignoreElement()
.test()
.assertResult();
Single.error(new TestException())
.ignoreElement()
.test()
.assertFailure(TestException.class);
}
代码示例来源:origin: mqtt-bee/mqtt-bee
@Override
public @NotNull Completable unsubscribe(final @Nullable Mqtt3Unsubscribe unsubscribe) {
return delegate.unsubscribe(MqttChecks.unsubscribe(unsubscribe)).ignoreElement()
.onErrorResumeNext(EXCEPTION_MAPPER_COMPLETABLE);
}
代码示例来源:origin: br.com.thiaguten/rx-mqtt-api
default Completable offAndClose(String... topics) {
Completable close = close();
Completable disconnForcibly = disconnectForcibly().onErrorResumeNext(e -> close);
Completable disconnect = disconnect().ignoreElement();
Completable off = off(topics).ignoreElement().onErrorResumeNext(e -> disconnect);
return isConnected()
.flatMapCompletable(connected -> connected ? off : close)
.andThen(disconnect).onErrorResumeNext(e -> disconnForcibly)
.andThen(close);
}
代码示例来源:origin: FroMage/redpipe
if(serviceName != null) {
Record record = HttpEndpoint.createRecord(serviceName.value(), host, port, path.value());
records = records.andThen(discovery.rxPublish(record).ignoreElement());
records = records.andThen(discovery.rxPublish(record).ignoreElement());
代码示例来源:origin: net.redpipe/redpipe-engine
@Override
public Completable send(Mail email) {
Single<Optional<Buffer>> htmlRender = email.renderHtml().map(buffer -> Optional.of(buffer)).toSingle(Optional.empty());
Single<Buffer> textRender = email.renderText();
return Single.zip(textRender, htmlRender, (text, html) -> {
MailMessage message = new MailMessage();
message.setFrom(email.from);
if(email.to != null)
message.setTo(Arrays.asList(email.to));
if(email.cc != null)
message.setCc(Arrays.asList(email.cc));
if(email.bcc != null)
message.setBcc(Arrays.asList(email.bcc));
message.setSubject(email.subject);
message.setText(text.toString());
if(html.isPresent())
message.setHtml(html.get().toString());
return mailClient.rxSendMail(message).ignoreElement();
}).flatMapCompletable(c -> c);
}
}
代码示例来源:origin: FroMage/redpipe
@Override
public Completable send(Mail email) {
Single<Optional<Buffer>> htmlRender = email.renderHtml().map(buffer -> Optional.of(buffer)).toSingle(Optional.empty());
Single<Buffer> textRender = email.renderText();
return Single.zip(textRender, htmlRender, (text, html) -> {
MailMessage message = new MailMessage();
message.setFrom(email.from);
if(email.to != null)
message.setTo(Arrays.asList(email.to));
if(email.cc != null)
message.setCc(Arrays.asList(email.cc));
if(email.bcc != null)
message.setBcc(Arrays.asList(email.bcc));
message.setSubject(email.subject);
message.setText(text.toString());
if(html.isPresent())
message.setHtml(html.get().toString());
return mailClient.rxSendMail(message).ignoreElement();
}).flatMapCompletable(c -> c);
}
}
代码示例来源:origin: d4rken/RxShell
public synchronized Completable close() {
if (RXSDebug.isDebug()) Timber.tag(TAG).v("close()");
if (session == null) return Completable.complete();
else return session.flatMapCompletable(s -> s.destroy().andThen(s.waitFor().ignoreElement()));
}
代码示例来源:origin: radixdlt/radixdlt-java
/**
* Attempts to connect to this Radix node on subscribe if not already connected
*
* @return completable which signifies when connection has been made
*/
public Completable connect() {
return this.getStatus()
.doOnNext(status -> {
// TODO: cancel tryConnect on dispose
if (status.equals(RadixClientStatus.CLOSED)) {
this.tryConnect();
} else if (status.equals(RadixClientStatus.FAILURE)) {
throw Exceptions.propagate(new IOException(this.endpoint + ": connection failure"));
}
})
.filter(status -> status.equals(RadixClientStatus.OPEN))
.firstOrError()
.ignoreElement();
}
代码示例来源:origin: tsegismont/vertx-musicstore
@Override
public void start(Future<Void> startFuture) throws Exception {
datasourceConfig = new DatasourceConfig(config().getJsonObject("datasource", new JsonObject()));
dbClient = JDBCClient.createShared(vertx, datasourceConfig.toJson(), "MusicStoreDS");
templateEngine = FreeMarkerTemplateEngine.create(vertx);
String connectionString = config().getJsonObject("mongo", new JsonObject()).getString("url", "mongodb://localhost");
mongoClient = MongoClients.create(connectionString);
mongoDatabase = mongoClient.getDatabase("music");
Completable databaseSetup = updateDB()
.andThen(loadDbQueries()).doOnSuccess(props -> dbQueries = props)
.ignoreElement();
databaseSetup
.andThen(Completable.fromAction(() -> setupAuthProvider()))
.andThen(Completable.defer(() -> setupWebServer()))
.subscribe(CompletableHelper.toObserver(startFuture));
}
代码示例来源:origin: FroMage/redpipe
@Override
public Completable init() {
return Completable.defer(() -> {
// Setup the Vertx-CDI integration
VertxExtension vertxExtension = CDI.current().select(VertxExtension.class).get();
BeanManager beanManager = CDI.current().getBeanManager();
// has to be done in a blocking thread
Vertx vertx = AppGlobals.get().getVertx();
return vertx.rxExecuteBlocking(future -> {
vertxExtension.registerConsumers(vertx.getDelegate(), BeanManagerProxy.unwrap(beanManager).event());
future.complete();
}).ignoreElement();
});
}
代码示例来源:origin: mqtt-bee/mqtt-bee
return connectScenario.ignoreElement().andThen(publishScenario).ignoreElements().andThen(disconnectScenario);
代码示例来源:origin: net.redpipe/redpipe-engine
private Completable startVertx(VertxResteasyDeployment deployment)
{
return Completable.defer(() -> {
Router router = Router.router(vertx);
AppGlobals globals = AppGlobals.get();
globals.setRouter(router);
VertxPluginRequestHandler resteasyHandler = new VertxPluginRequestHandler(vertx, deployment, plugins);
return doOnPlugins(plugin -> plugin.preRoute())
.doOnComplete(() -> {
setupRoutes(router);
router.route().handler(routingContext -> {
ResteasyProviderFactory.pushContext(RoutingContext.class, routingContext);
ResteasyProviderFactory.pushContext(io.vertx.rxjava.ext.web.RoutingContext.class,
io.vertx.rxjava.ext.web.RoutingContext.newInstance(routingContext.getDelegate()));
resteasyHandler.handle(routingContext.request());
});
}).concatWith(doOnPlugins(plugin -> plugin.postRoute()))
.concatWith(Completable.defer(() -> {
// Start the front end server using the Jax-RS controller
int port = globals.getConfig().getInteger("http_port", 9000);
String host = globals.getConfig().getString("http_host", NetServerOptions.DEFAULT_HOST);
return vertx.createHttpServer()
.requestHandler(router::accept)
.rxListen(port, host)
.doOnSuccess(server -> System.out.println("Server started on port " + server.actualPort()))
.doOnError(t -> t.printStackTrace())
.ignoreElement();
}));
});
}
代码示例来源:origin: mqtt-bee/mqtt-bee
return connectScenario.ignoreElement()
.andThen(subscribeScenario)
.take(countToPublish)
代码示例来源:origin: FroMage/redpipe
private Completable startVertx(VertxResteasyDeployment deployment)
{
return Completable.defer(() -> {
Router router = Router.router(vertx);
AppGlobals globals = AppGlobals.get();
globals.setRouter(router);
VertxPluginRequestHandler resteasyHandler = new VertxPluginRequestHandler(vertx, deployment, plugins);
return doOnPlugins(plugin -> plugin.preRoute())
.doOnComplete(() -> {
setupRoutes(router);
router.route().handler(routingContext -> {
ResteasyProviderFactory.pushContext(RoutingContext.class, routingContext);
ResteasyProviderFactory.pushContext(io.vertx.rxjava.ext.web.RoutingContext.class,
io.vertx.rxjava.ext.web.RoutingContext.newInstance(routingContext.getDelegate()));
resteasyHandler.handle(routingContext.request());
});
}).concatWith(doOnPlugins(plugin -> plugin.postRoute()))
.concatWith(Completable.defer(() -> {
// Start the front end server using the Jax-RS controller
int port = globals.getConfig().getInteger("http_port", 9000);
String host = globals.getConfig().getString("http_host", NetServerOptions.DEFAULT_HOST);
return vertx.createHttpServer()
.requestHandler(router::accept)
.rxListen(port, host)
.doOnSuccess(server -> System.out.println("Server started on port " + server.actualPort()))
.doOnError(t -> t.printStackTrace())
.ignoreElement();
}));
});
}
代码示例来源:origin: mqtt-bee/mqtt-bee
@Test
void connect() {
final Mqtt5MessageException mqtt5MessageException =
new Mqtt5DisconnectException(MqttDisconnect.DEFAULT, "reason from original exception");
given(mqtt5Client.connect(any())).willReturn(Single.error(mqtt5MessageException));
final Mqtt3Connect connect = Mqtt3Connect.builder().build();
assertMqtt3Exception(() -> mqtt3Client.connect(connect).ignoreElement().blockingAwait(), mqtt5MessageException);
}
代码示例来源:origin: mqtt-bee/mqtt-bee
@Test
void subscribe() {
final Mqtt5MessageException mqtt5MessageException =
new Mqtt5DisconnectException(MqttDisconnect.DEFAULT, "reason from original exception");
given(mqtt5Client.subscribe(any())).willReturn(Single.error(mqtt5MessageException));
final Mqtt3Subscribe subscribe = Mqtt3Subscribe.builder()
.addSubscription(Mqtt3Subscription.builder().topicFilter("topic").qos(MqttQos.AT_LEAST_ONCE).build())
.build();
assertMqtt3Exception(
() -> mqtt3Client.subscribe(subscribe).ignoreElement().blockingAwait(),
mqtt5MessageException);
}
代码示例来源:origin: tsegismont/vertx-musicstore
.requestHandler(router::accept)
.rxListen(8080)
.ignoreElement();
return completable;
内容来源于网络,如有侵权,请联系作者删除!