io.reactivex.Single.fromPublisher()方法的使用及代码示例

x33g5p2x  于2022-01-29 转载在 其他  
字(7.2k)|赞(0)|评价(0)|浏览(137)

本文整理了Java中io.reactivex.Single.fromPublisher()方法的一些代码示例,展示了Single.fromPublisher()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Single.fromPublisher()方法的具体详情如下:
包路径:io.reactivex.Single
类名称:Single
方法名:fromPublisher

Single.fromPublisher介绍

[英]Wraps a specific Publisher into a Single and signals its single element or error.

If the source Publisher is empty, a NoSuchElementException is signalled. If the source has more than one element, an IndexOutOfBoundsException is signalled.

The Publisher must follow the Reactive-Streams specification. Violating the specification may result in undefined behavior.

If possible, use #create(SingleOnSubscribe) to create a source-like Single instead.

Note that even though Publisher appears to be a functional interface, it is not recommended to implement it through a lambda as the specification requires state management that is not achievable with a stateless lambda. Backpressure: The publisher is consumed in an unbounded fashion but will be cancelled if it produced more than one item. Scheduler: fromPublisher does not operate by default on a particular Scheduler.
[中]将特定的发布者包装为单个,并发出其单个元素或错误的信号。
如果源发布服务器为空,则会发出NoTouchElementException信号。如果源有多个元素,则会发出IndexOutOfBoundsException信号。
发布者必须遵循Reactive-Streams specification。违反规范可能会导致未定义的行为。
如果可能,使用#create(SingleOnSubscribe)创建类似Single的源代码。
请注意,尽管Publisher似乎是一个功能接口,但不建议通过lambda实现它,因为该规范需要状态管理,而无状态lambda是无法实现的。背压:出版商以无限制的方式消费,但如果它生产了多个项目,就会被取消。调度器:默认情况下,fromPublisher不会在特定的调度器上运行。

代码示例

代码示例来源:origin: lettuce-io/lettuce-core

@Override
  public io.reactivex.Single<?> apply(Publisher<?> source) {
    return io.reactivex.Single.fromPublisher(source);
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void fromPublisherNull() {
  Single.fromPublisher(null);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void isDisposed() {
  TestHelper.checkDisposed(Single.fromPublisher(Flowable.never()));
}

代码示例来源:origin: micronaut-projects/micronaut-core

@SuppressWarnings("unchecked")
  @Override
  public BindingResult<Single> bind(ArgumentConversionContext<Single> context, HttpRequest<?> source) {
    Collection<Argument<?>> typeVariables = context.getArgument().getTypeVariables().values();

    BindingResult<Publisher> result = publisherBodyBinder.bind(
      ConversionContext.of(Argument.of(Publisher.class, (Argument[]) typeVariables.toArray(new Argument[0]))),
      source
    );
    if (result.isPresentAndSatisfied()) {
      return () -> Optional.of(Single.fromPublisher(result.get()));
    }
    return BindingResult.EMPTY;
  }
}

代码示例来源:origin: resilience4j/resilience4j

@Override
public SingleSource<T> apply(Single<T> upstream) {
  return Single.fromPublisher(downstream -> {
    Flowable<T> flowable = upstream.toFlowable();
    SubscriptionArbiter sa = new SubscriptionArbiter();
    downstream.onSubscribe(sa);
    RetrySubscriber<T> retrySubscriber = new RetrySubscriber<>(downstream, retry.getRetryConfig().getMaxAttempts(), sa, flowable, retry);
    flowable.subscribe(retrySubscriber);
  });
}

代码示例来源:origin: micronaut-projects/micronaut-core

Single<HttpStatus> passPublisher = Single.fromPublisher(consulClient.pass(checkId));
  passPublisher.subscribe((httpStatus, throwable) -> {
    if (throwable == null) {
      Single.fromPublisher(consulClient.getServiceIds()).subscribe((serviceIds, throwable1) -> {
        if (throwable1 == null) {
          String serviceId = idGenerator.generateId(environment, instance);
} else {
  Single<HttpStatus> failPublisher = Single.fromPublisher(consulClient.fail(checkId, status.getDescription().orElse(null)));
  failPublisher.subscribe((httpStatus, throwable) -> {
    if (throwable == null) {

代码示例来源:origin: ReactiveX/RxJava

@Test
public void empty() {
  Single.fromPublisher(Flowable.empty())
  .test()
  .assertFailure(NoSuchElementException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void just() {
  Single.fromPublisher(Flowable.just(1))
  .test()
  .assertResult(1);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
  public void fromPublisher() {
    Single.fromPublisher(Flowable.just(1))
    .test()
    .assertResult(1);
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void range() {
  Single.fromPublisher(Flowable.range(1, 3))
  .test()
  .assertFailure(IndexOutOfBoundsException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void error() {
  Single.fromPublisher(Flowable.error(new TestException()))
  .test()
  .assertFailure(TestException.class);
}

代码示例来源:origin: micronaut-projects/micronaut-core

Single.fromPublisher(finalPublisher).subscribeOn(Schedulers.from(channel.eventLoop())).subscribe((BiConsumer<MutableHttpResponse<?>, Throwable>) (actualResponse, throwable) -> {
  if (throwable != null) {
    ctx.fireExceptionCaught(throwable);

代码示例来源:origin: micronaut-projects/micronaut-core

InstanceInfo instanceInfo = registration.getInstanceInfo();
if (status.equals(HealthStatus.UP)) {
  Single<HttpStatus> heartbeatPublisher = Single.fromPublisher(eurekaClient.heartbeat(instanceInfo.getApp(), instanceInfo.getId()));

代码示例来源:origin: ReactiveX/RxJava

@Test
public void dispose() {
  PublishProcessor<Integer> pp = PublishProcessor.create();
  TestObserver<Integer> to = Single.fromPublisher(pp).test();
  assertTrue(pp.hasSubscribers());
  pp.onNext(1);
  to.cancel();
  assertFalse(pp.hasSubscribers());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
  public void badSource() {
    List<Throwable> errors = TestHelper.trackPluginErrors();

    try {
      Single.fromPublisher(new Flowable<Integer>() {
        @Override
        protected void subscribeActual(Subscriber<? super Integer> s) {
          s.onSubscribe(new BooleanSubscription());
          BooleanSubscription s2 = new BooleanSubscription();
          s.onSubscribe(s2);
          assertTrue(s2.isCancelled());

          s.onNext(1);
          s.onComplete();
          s.onNext(2);
          s.onError(new TestException());
          s.onComplete();
        }
      })
      .test()
      .assertResult(1);

      TestHelper.assertError(errors, 0, IllegalStateException.class, "Subscription already set!");
      TestHelper.assertUndeliverable(errors, 1, TestException.class);
    } finally {
      RxJavaPlugins.reset();
    }
  }
}

代码示例来源:origin: io.micronaut/management

/**
 * @return the loggers as a {@link Single}
 */
@Read
public Single<Map<String, Object>> loggers() {
  return Single.fromPublisher(loggersManager.getLoggers(loggingSystem));
}

代码示例来源:origin: io.micronaut/micronaut-management

/**
 * @param name The name of the logger to find
 * @return the {@link LogLevel} (both configured and effective) of the named logger
 */
@Read
public Single<Map<String, Object>> logger(@NotBlank @Selector String name) {
  return Single.fromPublisher(loggersManager.getLogger(loggingSystem, name));
}

代码示例来源:origin: gravitee-io/graviteeio-access-management

@Override
public Single<Form> update(Form item) {
  FormMongo page = convert(item);
  return Single.fromPublisher(formsCollection.replaceOne(eq(FIELD_ID, page.getId()), page)).flatMap(updateResult -> findById(page.getId()).toSingle());
}

代码示例来源:origin: gravitee-io/graviteeio-access-management

@Override
public Single<ExtensionGrant> create(ExtensionGrant item) {
  ExtensionGrantMongo extensionGrant = convert(item);
  extensionGrant.setId(extensionGrant.getId() == null ? (String) idGenerator.generate() : extensionGrant.getId());
  return Single.fromPublisher(extensionGrantsCollection.insertOne(extensionGrant)).flatMap(success -> findById(extensionGrant.getId()).toSingle());
}

代码示例来源:origin: gravitee-io/graviteeio-access-management

@Override
public Single<Role> create(Role item) {
  RoleMongo role = convert(item);
  role.setId(role.getId() == null ? (String) idGenerator.generate() : role.getId());
  return Single.fromPublisher(rolesCollection.insertOne(role)).flatMap(success -> findById(role.getId()).toSingle());
}

相关文章

微信公众号

最新文章

更多