本文整理了Java中io.reactivex.Single.defer()
方法的一些代码示例,展示了Single.defer()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Single.defer()
方法的具体详情如下:
包路径:io.reactivex.Single
类名称:Single
方法名:defer
[英]Calls a Callable for each individual SingleObserver to return the actual Single source to be subscribed to. Scheduler: defer does not operate by default on a particular Scheduler.
[中]为每个单独的SingleObserver调用Callable,以返回要订阅的实际单个源。调度程序:默认情况下,延迟不会在特定调度程序上运行。
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void deferNull() {
Single.defer(null);
}
代码示例来源:origin: Polidea/RxAndroidBle
final Scheduler timeoutScheduler
) {
return Single.defer(new Callable<SingleSource<? extends RxBleDeviceServices>>() {
@Override
public SingleSource<? extends RxBleDeviceServices> call() throws Exception {
代码示例来源:origin: JessYanCoding/MVPArms
@Override
public Object invoke(Object proxy, Method method, @Nullable Object[] args)
throws Throwable {
// 此处在调用 serviceClass 中的方法时触发
if (method.getReturnType() == Observable.class) {
// 如果方法返回值是 Observable 的话,则包一层再返回,
// 只包一层 defer 由外部去控制耗时方法以及网络请求所处线程,
// 如此对原项目的影响为 0,且更可控。
return Observable.defer(() -> {
final T service = getRetrofitService(serviceClass);
// 执行真正的 Retrofit 动态代理的方法
return ((Observable) getRetrofitMethod(service, method)
.invoke(service, args));
});
} else if (method.getReturnType() == Single.class) {
// 如果方法返回值是 Single 的话,则包一层再返回。
return Single.defer(() -> {
final T service = getRetrofitService(serviceClass);
// 执行真正的 Retrofit 动态代理的方法
return ((Single) getRetrofitMethod(service, method)
.invoke(service, args));
});
}
// 返回值不是 Observable 或 Single 的话不处理。
final T service = getRetrofitService(serviceClass);
return getRetrofitMethod(service, method).invoke(service, args);
}
});
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void deferReturnsNull() {
Single.defer(Functions.<Single<Object>>nullSupplier()).blockingGet();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void normal() {
Single<Integer> s = Single.defer(new Callable<Single<Integer>>() {
int counter;
@Override
public Single<Integer> call() throws Exception {
return Single.just(++counter);
}
});
for (int i = 1; i < 33; i++) {
s.test().assertResult(i);
}
}
}
代码示例来源:origin: com.salesforce.servicelibs/rxgrpc-stub
@Override
public Single<O> apply(final Flowable<I> request) {
return Single.defer(new Callable<SingleSource<O>>() {
@Override
public SingleSource<O> call() throws Exception {
return operation.apply(request);
}
}).retryWhen(handler);
}
};
代码示例来源:origin: gentics/mesh
@Override
public Single<T> toSingle() {
return Single.defer(() -> invoke().rxSetHandler());
}
代码示例来源:origin: gentics/mesh
@Override
public Single<T> toSingle() {
return Single.defer(() -> invoke().rxSetHandler());
}
代码示例来源:origin: com.microsoft.rest.v2/client-runtime
@Override
public Single<HttpResponse> sendAsync(HttpRequest request) {
return Single.defer(() -> {
request.headers().set("Date", format.format(OffsetDateTime.now()));
return next.sendAsync(request);
});
}
}
代码示例来源:origin: VictorAlbertos/ReactiveCache
/**
* Read from cache by group and throw if no data is available.
*/
public final Single<T> read(final Object group) {
return Single.defer(() ->
Single.fromObservable(builder.processorProviders
.<T>process(getConfigProvider(exceptionAdapter.placeholderLoader(), group.toString(),
new EvictDynamicKeyGroup(false), false, null))))
.onErrorResumeNext(exceptionAdapter::stripPlaceholderLoaderException);
}
代码示例来源:origin: FroMage/redpipe
public static <T> Single<T> doInConnection(Func1<? super SQLConnection, ? extends Single<T>> func){
return Single.defer(() -> {
Single<SQLConnection> connection = getConnection();
return connection.flatMap(conn -> {
return func.call(conn).doAfterTerminate(() -> {
conn.close();
});
});
});
}
代码示例来源:origin: net.redpipe/redpipe-engine
public static <T> Single<T> doInConnection(Func1<? super SQLConnection, ? extends Single<T>> func){
return Single.defer(() -> {
Single<SQLConnection> connection = getConnection();
return connection.flatMap(conn -> {
return func.call(conn).doAfterTerminate(() -> {
conn.close();
});
});
});
}
代码示例来源:origin: com.microsoft.azure.v2/azure-client-runtime
@Override
public Observable<HttpResponse> call() {
return delayAsync()
.andThen(Single.defer(new Callable<Single<HttpResponse>>() {
@Override
public Single<HttpResponse> call() throws Exception {
final HttpRequest pollRequest = createPollRequest();
return restProxy.sendHttpRequestAsync(pollRequest);
}
}))
.flatMap(new Function<HttpResponse, Single<HttpResponse>>() {
@Override
public Single<HttpResponse> apply(HttpResponse response) {
return updateFromAsync(response);
}
})
.toObservable();
}
});
代码示例来源:origin: VictorAlbertos/ReactiveCache
/**
* Read from cache and throw if no data is available.
*/
public final Single<T> read() {
return Single.defer(() ->
Single.fromObservable(builder.processorProviders
.<T>process(getConfigProvider(exceptionAdapter.placeholderLoader(),
new EvictDynamicKey(false), false, null))))
.onErrorResumeNext(exceptionAdapter::stripPlaceholderLoaderException);
}
代码示例来源:origin: net.redpipe/redpipe-engine
protected Single<VertxResteasyDeployment> setupResteasy(Class<?>... resourceOrProviderClasses) {
return Single.defer(() -> {
// Build the Jax-RS hello world deployment
VertxResteasyDeployment deployment = new VertxResteasyDeployment();
deployment.getDefaultContextObjects().put(Vertx.class, AppGlobals.get().getVertx());
deployment.getDefaultContextObjects().put(AppGlobals.class, AppGlobals.get());
return doOnPlugins(plugin -> plugin.deployToResteasy(deployment)).toSingle(() -> {
for(Class<?> klass : resourceOrProviderClasses) {
if(klass.isAnnotationPresent(Path.class))
deployment.getActualResourceClasses().add(klass);
if(klass.isAnnotationPresent(Provider.class))
deployment.getActualProviderClasses().add(klass);
}
try {
deployment.start();
}catch(ExceptionInInitializerError err) {
// rxjava behaves badly on LinkageError
RedpipeUtil.rethrow(err.getCause());
}
return deployment;
}).doOnError(t -> t.printStackTrace());
});
}
代码示例来源:origin: FroMage/redpipe
protected Single<VertxResteasyDeployment> setupResteasy(Class<?>... resourceOrProviderClasses) {
return Single.defer(() -> {
// Build the Jax-RS hello world deployment
VertxResteasyDeployment deployment = new VertxResteasyDeployment();
deployment.getDefaultContextObjects().put(Vertx.class, AppGlobals.get().getVertx());
deployment.getDefaultContextObjects().put(AppGlobals.class, AppGlobals.get());
return doOnPlugins(plugin -> plugin.deployToResteasy(deployment)).toSingle(() -> {
for(Class<?> klass : resourceOrProviderClasses) {
if(klass.isAnnotationPresent(Path.class))
deployment.getActualResourceClasses().add(klass);
if(klass.isAnnotationPresent(Provider.class))
deployment.getActualProviderClasses().add(klass);
}
try {
deployment.start();
}catch(ExceptionInInitializerError err) {
// rxjava behaves badly on LinkageError
RedpipeUtil.rethrow(err.getCause());
}
return deployment;
}).doOnError(t -> t.printStackTrace());
});
}
代码示例来源:origin: akarnokd/akarnokd-misc
@SafeVarargs
public static <T> Single<T> latestSuccess(Single<T>... sources) {
return Single.defer(() -> {
AtomicReference<T> last = new AtomicReference<>();
return Observable.fromArray(sources)
.concatMap(source ->
source.doOnSuccess(last::lazySet)
.toObservable()
.onErrorResumeNext(Observable.empty())
)
.ignoreElements()
.andThen(Single.fromCallable(() -> {
if (last.get() == null) {
throw new NoSuchElementException();
}
return last.get();
}));
});
}
}
代码示例来源:origin: davidmoten/rxjava2-jdbc
@SuppressWarnings("unchecked")
private Single<TxWithoutValue> build() {
return Single.defer(() -> {
AtomicReference<Connection> con = new AtomicReference<Connection>();
// set the atomic reference when transactedConnection emits
Single<Connection> transactedConnection = b.connection //
.map(c -> Util.toTransactedConnection(con, c));
return Call //
.createWithZeroOutParameters(transactedConnection, b.sql, parameterGroups(), b.params) //
.materialize() //
.filter(x -> !x.isOnNext()) //
.<TxWithoutValue>flatMap(n -> Tx.toTx(n, con.get(), b.db)) //
.doOnNext(tx -> {
if (tx.isComplete()) {
((TxImpl<Object>) tx).connection().commit();
}
}) //
.lastOrError();
});
}
代码示例来源:origin: com.github.davidmoten/rxjava2-jdbc
@SuppressWarnings("unchecked")
private Single<TxWithoutValue> build() {
return Single.defer(() -> {
AtomicReference<Connection> con = new AtomicReference<Connection>();
// set the atomic reference when transactedConnection emits
Single<Connection> transactedConnection = b.connection //
.map(c -> Util.toTransactedConnection(con, c));
return Call //
.createWithZeroOutParameters(transactedConnection, b.sql, parameterGroups(), b.params) //
.materialize() //
.filter(x -> !x.isOnNext()) //
.<TxWithoutValue>flatMap(n -> Tx.toTx(n, con.get(), b.db)) //
.doOnNext(tx -> {
if (tx.isComplete()) {
((TxImpl<Object>) tx).connection().commit();
}
}) //
.lastOrError();
});
}
代码示例来源:origin: gentics/mesh
@Override
public Single<GenericMessageResponse> login(MeshRestClient meshRestClient) {
return Single.defer(() -> {
LoginRequest loginRequest = new LoginRequest();
loginRequest.setUsername(getUsername());
loginRequest.setPassword(getPassword());
return MeshRestRequestUtil.prepareRequest(HttpMethod.POST, "/auth/login", TokenResponse.class, loginRequest, meshRestClient, null, false).toSingle();
}).doOnSuccess(response -> token = response.getToken())
.map(ignore -> new GenericMessageResponse("OK"));
}
内容来源于网络,如有侵权,请联系作者删除!