io.reactivex.Single.defer()方法的使用及代码示例

x33g5p2x  于2022-01-29 转载在 其他  
字(9.3k)|赞(0)|评价(0)|浏览(224)

本文整理了Java中io.reactivex.Single.defer()方法的一些代码示例,展示了Single.defer()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Single.defer()方法的具体详情如下:
包路径:io.reactivex.Single
类名称:Single
方法名:defer

Single.defer介绍

[英]Calls a Callable for each individual SingleObserver to return the actual Single source to be subscribed to. Scheduler: defer does not operate by default on a particular Scheduler.
[中]为每个单独的SingleObserver调用Callable,以返回要订阅的实际单个源。调度程序:默认情况下,延迟不会在特定调度程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void deferNull() {
  Single.defer(null);
}

代码示例来源:origin: Polidea/RxAndroidBle

final Scheduler timeoutScheduler
) {
  return Single.defer(new Callable<SingleSource<? extends RxBleDeviceServices>>() {
    @Override
    public SingleSource<? extends RxBleDeviceServices> call() throws Exception {

代码示例来源:origin: JessYanCoding/MVPArms

@Override
  public Object invoke(Object proxy, Method method, @Nullable Object[] args)
      throws Throwable {
    // 此处在调用 serviceClass 中的方法时触发
    if (method.getReturnType() == Observable.class) {
      // 如果方法返回值是 Observable 的话,则包一层再返回,
      // 只包一层 defer 由外部去控制耗时方法以及网络请求所处线程,
      // 如此对原项目的影响为 0,且更可控。
      return Observable.defer(() -> {
        final T service = getRetrofitService(serviceClass);
        // 执行真正的 Retrofit 动态代理的方法
        return ((Observable) getRetrofitMethod(service, method)
            .invoke(service, args));
      });
    } else if (method.getReturnType() == Single.class) {
      // 如果方法返回值是 Single 的话,则包一层再返回。
      return Single.defer(() -> {
        final T service = getRetrofitService(serviceClass);
        // 执行真正的 Retrofit 动态代理的方法
        return ((Single) getRetrofitMethod(service, method)
            .invoke(service, args));
      });
    }
    // 返回值不是 Observable 或 Single 的话不处理。
    final T service = getRetrofitService(serviceClass);
    return getRetrofitMethod(service, method).invoke(service, args);
  }
});

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void deferReturnsNull() {
  Single.defer(Functions.<Single<Object>>nullSupplier()).blockingGet();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
  public void normal() {

    Single<Integer> s = Single.defer(new Callable<Single<Integer>>() {
      int counter;
      @Override
      public Single<Integer> call() throws Exception {
        return Single.just(++counter);
      }
    });

    for (int i = 1; i < 33; i++) {
      s.test().assertResult(i);
    }
  }
}

代码示例来源:origin: com.salesforce.servicelibs/rxgrpc-stub

@Override
  public Single<O> apply(final Flowable<I> request) {
    return Single.defer(new Callable<SingleSource<O>>() {
      @Override
      public SingleSource<O> call() throws Exception {
        return operation.apply(request);
      }
    }).retryWhen(handler);
  }
};

代码示例来源:origin: gentics/mesh

@Override
public Single<T> toSingle() {
  return Single.defer(() -> invoke().rxSetHandler());
}

代码示例来源:origin: gentics/mesh

@Override
public Single<T> toSingle() {
  return Single.defer(() -> invoke().rxSetHandler());
}

代码示例来源:origin: com.microsoft.rest.v2/client-runtime

@Override
  public Single<HttpResponse> sendAsync(HttpRequest request) {
    return Single.defer(() -> {
      request.headers().set("Date", format.format(OffsetDateTime.now()));
      return next.sendAsync(request);
    });
  }
}

代码示例来源:origin: VictorAlbertos/ReactiveCache

/**
 * Read from cache by group and throw if no data is available.
 */
public final Single<T> read(final Object group) {
 return Single.defer(() ->
   Single.fromObservable(builder.processorProviders
     .<T>process(getConfigProvider(exceptionAdapter.placeholderLoader(), group.toString(),
       new EvictDynamicKeyGroup(false), false, null))))
   .onErrorResumeNext(exceptionAdapter::stripPlaceholderLoaderException);
}

代码示例来源:origin: FroMage/redpipe

public static <T> Single<T> doInConnection(Func1<? super SQLConnection, ? extends Single<T>> func){
  return Single.defer(() -> {
    Single<SQLConnection> connection = getConnection();
    return connection.flatMap(conn -> {
      return func.call(conn).doAfterTerminate(() -> {
        conn.close();
      });
    });
  });
}

代码示例来源:origin: net.redpipe/redpipe-engine

public static <T> Single<T> doInConnection(Func1<? super SQLConnection, ? extends Single<T>> func){
  return Single.defer(() -> {
    Single<SQLConnection> connection = getConnection();
    return connection.flatMap(conn -> {
      return func.call(conn).doAfterTerminate(() -> {
        conn.close();
      });
    });
  });
}

代码示例来源:origin: com.microsoft.azure.v2/azure-client-runtime

@Override
  public Observable<HttpResponse> call() {
    return delayAsync()
        .andThen(Single.defer(new Callable<Single<HttpResponse>>() {
          @Override
          public Single<HttpResponse> call() throws Exception {
            final HttpRequest pollRequest = createPollRequest();
            return restProxy.sendHttpRequestAsync(pollRequest);
          }
        }))
        .flatMap(new Function<HttpResponse, Single<HttpResponse>>() {
          @Override
          public Single<HttpResponse> apply(HttpResponse response) {
            return updateFromAsync(response);
          }
        })
        .toObservable();
  }
});

代码示例来源:origin: VictorAlbertos/ReactiveCache

/**
 * Read from cache and throw if no data is available.
 */
public final Single<T> read() {
 return Single.defer(() ->
   Single.fromObservable(builder.processorProviders
     .<T>process(getConfigProvider(exceptionAdapter.placeholderLoader(),
       new EvictDynamicKey(false), false, null))))
   .onErrorResumeNext(exceptionAdapter::stripPlaceholderLoaderException);
}

代码示例来源:origin: net.redpipe/redpipe-engine

protected Single<VertxResteasyDeployment> setupResteasy(Class<?>... resourceOrProviderClasses) {
  return Single.defer(() -> {
    // Build the Jax-RS hello world deployment
    VertxResteasyDeployment deployment = new VertxResteasyDeployment();
    deployment.getDefaultContextObjects().put(Vertx.class, AppGlobals.get().getVertx());
    deployment.getDefaultContextObjects().put(AppGlobals.class, AppGlobals.get());
    return doOnPlugins(plugin -> plugin.deployToResteasy(deployment)).toSingle(() -> {
      for(Class<?> klass : resourceOrProviderClasses) {
        if(klass.isAnnotationPresent(Path.class))
          deployment.getActualResourceClasses().add(klass);
        if(klass.isAnnotationPresent(Provider.class))
          deployment.getActualProviderClasses().add(klass);
      }
      try {
        deployment.start();
      }catch(ExceptionInInitializerError err) {
        // rxjava behaves badly on LinkageError
        RedpipeUtil.rethrow(err.getCause());
      }
      return deployment;
    }).doOnError(t -> t.printStackTrace());
  });
}

代码示例来源:origin: FroMage/redpipe

protected Single<VertxResteasyDeployment> setupResteasy(Class<?>... resourceOrProviderClasses) {
  return Single.defer(() -> {
    // Build the Jax-RS hello world deployment
    VertxResteasyDeployment deployment = new VertxResteasyDeployment();
    deployment.getDefaultContextObjects().put(Vertx.class, AppGlobals.get().getVertx());
    deployment.getDefaultContextObjects().put(AppGlobals.class, AppGlobals.get());
    return doOnPlugins(plugin -> plugin.deployToResteasy(deployment)).toSingle(() -> {
      for(Class<?> klass : resourceOrProviderClasses) {
        if(klass.isAnnotationPresent(Path.class))
          deployment.getActualResourceClasses().add(klass);
        if(klass.isAnnotationPresent(Provider.class))
          deployment.getActualProviderClasses().add(klass);
      }
      try {
        deployment.start();
      }catch(ExceptionInInitializerError err) {
        // rxjava behaves badly on LinkageError
        RedpipeUtil.rethrow(err.getCause());
      }
      return deployment;
    }).doOnError(t -> t.printStackTrace());
  });
}

代码示例来源:origin: akarnokd/akarnokd-misc

@SafeVarargs
  public static <T> Single<T> latestSuccess(Single<T>... sources) {
    return Single.defer(() -> {
      AtomicReference<T> last = new AtomicReference<>();
      return Observable.fromArray(sources)
        .concatMap(source ->
           source.doOnSuccess(last::lazySet)
           .toObservable()
           .onErrorResumeNext(Observable.empty())
        )
        .ignoreElements()
        .andThen(Single.fromCallable(() -> {
          if (last.get() == null) {
            throw new NoSuchElementException();
          }
          return last.get();
        }));
    });
  }
}

代码示例来源:origin: davidmoten/rxjava2-jdbc

@SuppressWarnings("unchecked")
private Single<TxWithoutValue> build() {
  return Single.defer(() -> {
    AtomicReference<Connection> con = new AtomicReference<Connection>();
    // set the atomic reference when transactedConnection emits
    Single<Connection> transactedConnection = b.connection //
        .map(c -> Util.toTransactedConnection(con, c));
    return Call //
        .createWithZeroOutParameters(transactedConnection, b.sql, parameterGroups(), b.params) //
        .materialize() //
        .filter(x -> !x.isOnNext()) //
        .<TxWithoutValue>flatMap(n -> Tx.toTx(n, con.get(), b.db)) //
        .doOnNext(tx -> {
          if (tx.isComplete()) {
            ((TxImpl<Object>) tx).connection().commit();
          }
        }) //
        .lastOrError();
  });
}

代码示例来源:origin: com.github.davidmoten/rxjava2-jdbc

@SuppressWarnings("unchecked")
private Single<TxWithoutValue> build() {
  return Single.defer(() -> {
    AtomicReference<Connection> con = new AtomicReference<Connection>();
    // set the atomic reference when transactedConnection emits
    Single<Connection> transactedConnection = b.connection //
        .map(c -> Util.toTransactedConnection(con, c));
    return Call //
        .createWithZeroOutParameters(transactedConnection, b.sql, parameterGroups(), b.params) //
        .materialize() //
        .filter(x -> !x.isOnNext()) //
        .<TxWithoutValue>flatMap(n -> Tx.toTx(n, con.get(), b.db)) //
        .doOnNext(tx -> {
          if (tx.isComplete()) {
            ((TxImpl<Object>) tx).connection().commit();
          }
        }) //
        .lastOrError();
  });
}

代码示例来源:origin: gentics/mesh

@Override
public Single<GenericMessageResponse> login(MeshRestClient meshRestClient) {
  return Single.defer(() -> {
    LoginRequest loginRequest = new LoginRequest();
    loginRequest.setUsername(getUsername());
    loginRequest.setPassword(getPassword());
    return MeshRestRequestUtil.prepareRequest(HttpMethod.POST, "/auth/login", TokenResponse.class, loginRequest, meshRestClient, null, false).toSingle();
  }).doOnSuccess(response -> token = response.getToken())
  .map(ignore -> new GenericMessageResponse("OK"));
}

相关文章

微信公众号

最新文章

更多