本文整理了Java中io.reactivex.Single.flatMapObservable()
方法的一些代码示例,展示了Single.flatMapObservable()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Single.flatMapObservable()
方法的具体详情如下:
包路径:io.reactivex.Single
类名称:Single
方法名:flatMapObservable
[英]Returns an Observable that is based on applying a specified function to the item emitted by the source Single, where that function returns an ObservableSource.
Scheduler: flatMapObservable does not operate by default on a particular Scheduler.
[中]返回一个Observable,该函数基于将指定函数应用于源Single发出的项,其中该函数返回一个ObservableSource。
调度器:默认情况下,flatMapObservable不会在特定的调度器上运行。
代码示例来源:origin: Polidea/RxAndroidBle
@Override
public Observable<byte[]> build() {
if (writtenCharacteristicObservable == null) {
throw new IllegalArgumentException("setCharacteristicUuid() or setCharacteristic() needs to be called before build()");
}
if (bytes == null) {
throw new IllegalArgumentException("setBytes() needs to be called before build()");
}
// TODO: [DS 24.05.2017] Think about a warning if specified maxBatchSize is greater than MTU
return writtenCharacteristicObservable.flatMapObservable(new Function<BluetoothGattCharacteristic, Observable<byte[]>>() {
@Override
public Observable<byte[]> apply(BluetoothGattCharacteristic bluetoothGattCharacteristic) {
return operationQueue.queue(
operationsProvider.provideLongWriteOperation(bluetoothGattCharacteristic,
writeOperationAckStrategy, writeOperationRetryStrategy, maxBatchSizeProvider, bytes)
);
}
});
}
}
代码示例来源:origin: Polidea/RxAndroidBle
@Override
public Observable<Observable<byte[]>> setupIndication(@NonNull UUID characteristicUuid,
@NonNull final NotificationSetupMode setupMode) {
return getCharacteristic(characteristicUuid)
.flatMapObservable(new Function<BluetoothGattCharacteristic, ObservableSource<? extends Observable<byte[]>>>() {
@Override
public Observable<? extends Observable<byte[]>> apply(BluetoothGattCharacteristic characteristic) {
return setupIndication(characteristic, setupMode);
}
});
}
代码示例来源:origin: Polidea/RxAndroidBle
@Override
public Observable<Observable<byte[]>> setupNotification(@NonNull UUID characteristicUuid,
@NonNull final NotificationSetupMode setupMode) {
return getCharacteristic(characteristicUuid)
.flatMapObservable(new Function<BluetoothGattCharacteristic, ObservableSource<? extends Observable<byte[]>>>() {
@Override
public Observable<? extends Observable<byte[]>> apply(BluetoothGattCharacteristic characteristic) {
return setupNotification(characteristic, setupMode);
}
});
}
代码示例来源:origin: ReactiveX/RxJava
@Test(timeout = 5000)
public void testIssue1935NoUnsubscribeDownstream() {
Observable<Integer> source = Observable.just(1)
.all(new Predicate<Integer>() {
@Override
public boolean test(Integer t1) {
return false;
}
})
.flatMapObservable(new Function<Boolean, Observable<Integer>>() {
@Override
public Observable<Integer> apply(Boolean t1) {
return Observable.just(2).delay(500, TimeUnit.MILLISECONDS);
}
});
assertEquals((Object)2, source.blockingFirst());
}
代码示例来源:origin: pockethub/PocketHub
public static <B> Observable<Page<B>> getAllPages(
GitHubRequest<Response<Page<B>>> pagedSingleCall, int i) {
return pagedSingleCall.execute(i)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.flatMapObservable(response -> {
Page<B> page = response.body();
if (page.next() == null) {
return Observable.just(page);
}
return Observable.just(page)
.concatWith(getAllPages(pagedSingleCall, page.next()));
});
}
}
代码示例来源:origin: pockethub/PocketHub
private Observable<Page<NotificationThread>> getPageAndNext(int i) {
return ServiceGenerator.createService(getActivity(), NotificationService.class)
.getNotifications(filters, i)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.flatMapObservable(response -> {
Page<NotificationThread> page = response.body();
if (page.next() == null) {
return Observable.just(page);
}
return Observable.just(page).concatWith(getPageAndNext(page.next()));
});
}
代码示例来源:origin: pockethub/PocketHub
Single<List<GitHubComment>> getGistComments() {
return ServiceGenerator.createService(context, GistCommentService.class)
.getGistComments(id, 0)
.flatMapObservable(response -> Observable.fromIterable(response.body().items()))
.map(comment -> {
imageGetter.encode(comment, comment.bodyHtml());
return comment;
})
.toList();
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test(timeout = 5000)
public void testIssue1935NoUnsubscribeDownstream() {
Observable<Integer> source = Observable.just(1).isEmpty()
.flatMapObservable(new Function<Boolean, Observable<Integer>>() {
@Override
public Observable<Integer> apply(Boolean t1) {
return Observable.just(2).delay(500, TimeUnit.MILLISECONDS);
}
});
assertEquals((Object)2, source.blockingFirst());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void flatMapObservable() {
Single.just(1).flatMapObservable(new Function<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> apply(Integer v) throws Exception {
return Observable.range(v, 5);
}
})
.test()
.assertResult(1, 2, 3, 4, 5);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void mapperCrash() {
Single.just(1).flatMapObservable(new Function<Integer, ObservableSource<? extends Object>>() {
@Override
public ObservableSource<? extends Object> apply(Integer v) throws Exception {
throw new TestException();
}
})
.test()
.assertFailure(TestException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void isDisposed() {
TestHelper.checkDisposed(Single.never().flatMapObservable(Functions.justFunction(Observable.never())));
}
}
代码示例来源:origin: Polidea/RxAndroidBle
@Override
protected void subscribeActual(Observer<? super RxBleClient.State> observer) {
if (!rxBleAdapterWrapper.hasBluetoothAdapter()) {
observer.onSubscribe(Disposables.empty());
observer.onComplete();
return;
}
checkPermissionUntilGranted(locationServicesStatus, timerScheduler)
.flatMapObservable(new Function<Boolean, Observable<RxBleClient.State>>() {
@Override
public Observable<RxBleClient.State> apply(Boolean permissionWasInitiallyGranted) {
return checkAdapterAndServicesState(
permissionWasInitiallyGranted,
rxBleAdapterWrapper,
bleAdapterStateObservable,
locationServicesOkObservable
);
}
})
.distinctUntilChanged()
.subscribe(observer);
}
}
代码示例来源:origin: resilience4j/resilience4j
@Test
public void shouldReleaseBulkheadOnlyOnce() {
Single.just(Arrays.asList(1, 2, 3))
.lift(BulkheadOperator.of(bulkhead))
.flatMapObservable(Observable::fromIterable)
.take(2) //this with the previous line triggers an extra dispose
.test()
.assertResult(1, 2);
assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(1);
}
}
代码示例来源:origin: io.github.jklingsporn/vertx-jooq-rx-jdbc
public static <T> Observable<T> executeBlockingObservable(Handler<Future<List<T>>> blockingCodeHandler, Vertx
vertx) {
return executeBlocking(blockingCodeHandler,vertx)
.flatMapObservable(Observable::fromIterable);
}
代码示例来源:origin: io.github.jklingsporn/vertx-jooq-async-rx
public static <T> Observable<T> executeBlockingObservable(Handler<Future<List<T>>> blockingCodeHandler, Vertx
vertx) {
return executeBlocking(blockingCodeHandler,vertx)
.flatMapObservable(Observable::fromIterable);
}
代码示例来源:origin: gravitee-io/graviteeio-access-management
@Override
public boolean upgrade() {
logger.info("Applying domain idp upgrade");
domainService.findAll()
.flatMapObservable(Observable::fromIterable)
.flatMapSingle(this::updateDefaultIdp)
.subscribe();
return true;
}
代码示例来源:origin: tsegismont/vertx-musicstore
private Single<JsonObject> findArtist(SQLConnection sqlConnection, Long artistId) {
return sqlConnection.rxQueryStreamWithParams(findArtistById, new JsonArray().add(artistId))
.flatMapObservable(SQLRowStream::toObservable)
.map(row -> new JsonObject().put("id", artistId).put("name", row.getString(0)))
.singleOrError();
}
代码示例来源:origin: tsegismont/vertx-musicstore
private Single<JsonArray> findGenres(SQLConnection sqlConnection) {
return sqlConnection.rxQueryStream(findAllGenres)
.flatMapObservable(SQLRowStream::toObservable)
.map(row -> new JsonObject().put("id", row.getLong(0)).put("name", row.getString(1)))
.collect(JsonArray::new, JsonArray::add);
}
}
代码示例来源:origin: gravitee-io/graviteeio-access-management
@Override
public boolean upgrade() {
logger.info("Applying scope upgrade");
domainService.findAll()
.flatMapObservable(domains -> Observable.fromIterable(domains))
.flatMapSingle(domain -> upgradeDomain(domain))
.subscribe();
return true;
}
代码示例来源:origin: io.vertx/vertx-rx-java2
private Observable<String> inTransaction(Exception e) throws Exception {
return client.rxGetConnection().flatMapObservable(conn -> {
return rxInsertExtraFolks(conn)
.andThen(uniqueNames(conn).toObservable())
.compose(upstream -> e == null ? upstream : upstream.concatWith(Observable.error(e)))
.compose(SQLClientHelper.txObservableTransformer(conn))
.concatWith(rxAssertAutoCommit(conn).toObservable())
.doFinally(conn::close);
});
}
}
内容来源于网络,如有侵权,请联系作者删除!