io.reactivex.Single.flatMapObservable()方法的使用及代码示例

x33g5p2x  于2022-01-29 转载在 其他  
字(8.8k)|赞(0)|评价(0)|浏览(242)

本文整理了Java中io.reactivex.Single.flatMapObservable()方法的一些代码示例,展示了Single.flatMapObservable()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Single.flatMapObservable()方法的具体详情如下:
包路径:io.reactivex.Single
类名称:Single
方法名:flatMapObservable

Single.flatMapObservable介绍

[英]Returns an Observable that is based on applying a specified function to the item emitted by the source Single, where that function returns an ObservableSource.

Scheduler: flatMapObservable does not operate by default on a particular Scheduler.
[中]返回一个Observable,该函数基于将指定函数应用于源Single发出的项,其中该函数返回一个ObservableSource。
调度器:默认情况下,flatMapObservable不会在特定的调度器上运行。

代码示例

代码示例来源:origin: Polidea/RxAndroidBle

@Override
  public Observable<byte[]> build() {
    if (writtenCharacteristicObservable == null) {
      throw new IllegalArgumentException("setCharacteristicUuid() or setCharacteristic() needs to be called before build()");
    }

    if (bytes == null) {
      throw new IllegalArgumentException("setBytes() needs to be called before build()");
    }

    // TODO: [DS 24.05.2017] Think about a warning if specified maxBatchSize is greater than MTU

    return writtenCharacteristicObservable.flatMapObservable(new Function<BluetoothGattCharacteristic, Observable<byte[]>>() {
      @Override
      public Observable<byte[]> apply(BluetoothGattCharacteristic bluetoothGattCharacteristic) {
        return operationQueue.queue(
            operationsProvider.provideLongWriteOperation(bluetoothGattCharacteristic,
                writeOperationAckStrategy, writeOperationRetryStrategy, maxBatchSizeProvider, bytes)
        );
      }
    });
  }
}

代码示例来源:origin: Polidea/RxAndroidBle

@Override
public Observable<Observable<byte[]>> setupIndication(@NonNull UUID characteristicUuid,
                           @NonNull final NotificationSetupMode setupMode) {
  return getCharacteristic(characteristicUuid)
      .flatMapObservable(new Function<BluetoothGattCharacteristic, ObservableSource<? extends Observable<byte[]>>>() {
        @Override
        public Observable<? extends Observable<byte[]>> apply(BluetoothGattCharacteristic characteristic) {
          return setupIndication(characteristic, setupMode);
        }
      });
}

代码示例来源:origin: Polidea/RxAndroidBle

@Override
public Observable<Observable<byte[]>> setupNotification(@NonNull UUID characteristicUuid,
                            @NonNull final NotificationSetupMode setupMode) {
  return getCharacteristic(characteristicUuid)
      .flatMapObservable(new Function<BluetoothGattCharacteristic, ObservableSource<? extends Observable<byte[]>>>() {
        @Override
        public Observable<? extends Observable<byte[]>> apply(BluetoothGattCharacteristic characteristic) {
          return setupNotification(characteristic, setupMode);
        }
      });
}

代码示例来源:origin: ReactiveX/RxJava

@Test(timeout = 5000)
public void testIssue1935NoUnsubscribeDownstream() {
  Observable<Integer> source = Observable.just(1)
    .all(new Predicate<Integer>() {
      @Override
      public boolean test(Integer t1) {
        return false;
      }
    })
    .flatMapObservable(new Function<Boolean, Observable<Integer>>() {
      @Override
      public Observable<Integer> apply(Boolean t1) {
        return Observable.just(2).delay(500, TimeUnit.MILLISECONDS);
      }
    });
  assertEquals((Object)2, source.blockingFirst());
}

代码示例来源:origin: pockethub/PocketHub

public static  <B> Observable<Page<B>> getAllPages(
      GitHubRequest<Response<Page<B>>> pagedSingleCall, int i) {

    return pagedSingleCall.execute(i)
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .flatMapObservable(response -> {
          Page<B> page = response.body();
          if (page.next() == null) {
            return Observable.just(page);
          }

          return Observable.just(page)
              .concatWith(getAllPages(pagedSingleCall, page.next()));

        });
  }
}

代码示例来源:origin: pockethub/PocketHub

private Observable<Page<NotificationThread>> getPageAndNext(int i) {
  return ServiceGenerator.createService(getActivity(), NotificationService.class)
      .getNotifications(filters, i)
      .subscribeOn(Schedulers.io())
      .observeOn(AndroidSchedulers.mainThread())
      .flatMapObservable(response -> {
        Page<NotificationThread> page = response.body();
        if (page.next() == null) {
          return Observable.just(page);
        }
        return Observable.just(page).concatWith(getPageAndNext(page.next()));
      });
}

代码示例来源:origin: pockethub/PocketHub

Single<List<GitHubComment>> getGistComments() {
    return ServiceGenerator.createService(context, GistCommentService.class)
        .getGistComments(id, 0)
        .flatMapObservable(response -> Observable.fromIterable(response.body().items()))
        .map(comment -> {
          imageGetter.encode(comment, comment.bodyHtml());
          return comment;
        })
        .toList();

  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test(timeout = 5000)
public void testIssue1935NoUnsubscribeDownstream() {
  Observable<Integer> source = Observable.just(1).isEmpty()
    .flatMapObservable(new Function<Boolean, Observable<Integer>>() {
      @Override
      public Observable<Integer> apply(Boolean t1) {
        return Observable.just(2).delay(500, TimeUnit.MILLISECONDS);
      }
    });
  assertEquals((Object)2, source.blockingFirst());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void flatMapObservable() {
  Single.just(1).flatMapObservable(new Function<Integer, Observable<Integer>>() {
    @Override
    public Observable<Integer> apply(Integer v) throws Exception {
      return Observable.range(v, 5);
    }
  })
  .test()
  .assertResult(1, 2, 3, 4, 5);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void mapperCrash() {
  Single.just(1).flatMapObservable(new Function<Integer, ObservableSource<? extends Object>>() {
    @Override
    public ObservableSource<? extends Object> apply(Integer v) throws Exception {
      throw new TestException();
    }
  })
  .test()
  .assertFailure(TestException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
  public void isDisposed() {
    TestHelper.checkDisposed(Single.never().flatMapObservable(Functions.justFunction(Observable.never())));
  }
}

代码示例来源:origin: Polidea/RxAndroidBle

@Override
  protected void subscribeActual(Observer<? super RxBleClient.State> observer) {
    if (!rxBleAdapterWrapper.hasBluetoothAdapter()) {
      observer.onSubscribe(Disposables.empty());
      observer.onComplete();
      return;
    }

    checkPermissionUntilGranted(locationServicesStatus, timerScheduler)
        .flatMapObservable(new Function<Boolean, Observable<RxBleClient.State>>() {
          @Override
          public Observable<RxBleClient.State> apply(Boolean permissionWasInitiallyGranted) {
            return checkAdapterAndServicesState(
                permissionWasInitiallyGranted,
                rxBleAdapterWrapper,
                bleAdapterStateObservable,
                locationServicesOkObservable
            );
          }
        })
        .distinctUntilChanged()
        .subscribe(observer);
  }
}

代码示例来源:origin: resilience4j/resilience4j

@Test
  public void shouldReleaseBulkheadOnlyOnce() {
    Single.just(Arrays.asList(1, 2, 3))
      .lift(BulkheadOperator.of(bulkhead))
      .flatMapObservable(Observable::fromIterable)
      .take(2) //this with the previous line triggers an extra dispose
      .test()
      .assertResult(1, 2);

    assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(1);
  }
}

代码示例来源:origin: io.github.jklingsporn/vertx-jooq-rx-jdbc

public static <T> Observable<T> executeBlockingObservable(Handler<Future<List<T>>> blockingCodeHandler, Vertx
  vertx) {
  return executeBlocking(blockingCodeHandler,vertx)
      .flatMapObservable(Observable::fromIterable);
}

代码示例来源:origin: io.github.jklingsporn/vertx-jooq-async-rx

public static <T> Observable<T> executeBlockingObservable(Handler<Future<List<T>>> blockingCodeHandler, Vertx
  vertx) {
  return executeBlocking(blockingCodeHandler,vertx)
    .flatMapObservable(Observable::fromIterable);
}

代码示例来源:origin: gravitee-io/graviteeio-access-management

@Override
public boolean upgrade() {
  logger.info("Applying domain idp upgrade");
  domainService.findAll()
      .flatMapObservable(Observable::fromIterable)
      .flatMapSingle(this::updateDefaultIdp)
      .subscribe();
  return true;
}

代码示例来源:origin: tsegismont/vertx-musicstore

private Single<JsonObject> findArtist(SQLConnection sqlConnection, Long artistId) {
 return sqlConnection.rxQueryStreamWithParams(findArtistById, new JsonArray().add(artistId))
  .flatMapObservable(SQLRowStream::toObservable)
  .map(row -> new JsonObject().put("id", artistId).put("name", row.getString(0)))
  .singleOrError();
}

代码示例来源:origin: tsegismont/vertx-musicstore

private Single<JsonArray> findGenres(SQLConnection sqlConnection) {
  return sqlConnection.rxQueryStream(findAllGenres)
   .flatMapObservable(SQLRowStream::toObservable)
   .map(row -> new JsonObject().put("id", row.getLong(0)).put("name", row.getString(1)))
   .collect(JsonArray::new, JsonArray::add);
 }
}

代码示例来源:origin: gravitee-io/graviteeio-access-management

@Override
public boolean upgrade() {
  logger.info("Applying scope upgrade");
  domainService.findAll()
      .flatMapObservable(domains -> Observable.fromIterable(domains))
      .flatMapSingle(domain -> upgradeDomain(domain))
      .subscribe();
  return true;
}

代码示例来源:origin: io.vertx/vertx-rx-java2

private Observable<String> inTransaction(Exception e) throws Exception {
  return client.rxGetConnection().flatMapObservable(conn -> {
   return rxInsertExtraFolks(conn)
    .andThen(uniqueNames(conn).toObservable())
    .compose(upstream -> e == null ? upstream : upstream.concatWith(Observable.error(e)))
    .compose(SQLClientHelper.txObservableTransformer(conn))
    .concatWith(rxAssertAutoCommit(conn).toObservable())
    .doFinally(conn::close);
  });
 }
}

相关文章

微信公众号

最新文章

更多