本文整理了Java中io.reactivex.Single.fromPublisher()
方法的一些代码示例,展示了Single.fromPublisher()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Single.fromPublisher()
方法的具体详情如下:
包路径:io.reactivex.Single
类名称:Single
方法名:fromPublisher
[英]Wraps a specific Publisher into a Single and signals its single element or error.
If the source Publisher is empty, a NoSuchElementException is signalled. If the source has more than one element, an IndexOutOfBoundsException is signalled.
The Publisher must follow the Reactive-Streams specification. Violating the specification may result in undefined behavior.
If possible, use #create(SingleOnSubscribe) to create a source-like Single instead.
Note that even though Publisher appears to be a functional interface, it is not recommended to implement it through a lambda as the specification requires state management that is not achievable with a stateless lambda. Backpressure: The publisher is consumed in an unbounded fashion but will be cancelled if it produced more than one item. Scheduler: fromPublisher does not operate by default on a particular Scheduler.
[中]将特定的发布者包装为单个,并发出其单个元素或错误的信号。
如果源发布服务器为空,则会发出NoTouchElementException信号。如果源有多个元素,则会发出IndexOutOfBoundsException信号。
发布者必须遵循Reactive-Streams specification。违反规范可能会导致未定义的行为。
如果可能,使用#create(SingleOnSubscribe)创建类似Single的源代码。
请注意,尽管Publisher似乎是一个功能接口,但不建议通过lambda实现它,因为该规范需要状态管理,而无状态lambda是无法实现的。背压:出版商以无限制的方式消费,但如果它生产了多个项目,就会被取消。调度器:默认情况下,fromPublisher不会在特定的调度器上运行。
代码示例来源:origin: lettuce-io/lettuce-core
@Override
public io.reactivex.Single<?> apply(Publisher<?> source) {
return io.reactivex.Single.fromPublisher(source);
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void fromPublisherNull() {
Single.fromPublisher(null);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void isDisposed() {
TestHelper.checkDisposed(Single.fromPublisher(Flowable.never()));
}
代码示例来源:origin: micronaut-projects/micronaut-core
@SuppressWarnings("unchecked")
@Override
public BindingResult<Single> bind(ArgumentConversionContext<Single> context, HttpRequest<?> source) {
Collection<Argument<?>> typeVariables = context.getArgument().getTypeVariables().values();
BindingResult<Publisher> result = publisherBodyBinder.bind(
ConversionContext.of(Argument.of(Publisher.class, (Argument[]) typeVariables.toArray(new Argument[0]))),
source
);
if (result.isPresentAndSatisfied()) {
return () -> Optional.of(Single.fromPublisher(result.get()));
}
return BindingResult.EMPTY;
}
}
代码示例来源:origin: resilience4j/resilience4j
@Override
public SingleSource<T> apply(Single<T> upstream) {
return Single.fromPublisher(downstream -> {
Flowable<T> flowable = upstream.toFlowable();
SubscriptionArbiter sa = new SubscriptionArbiter();
downstream.onSubscribe(sa);
RetrySubscriber<T> retrySubscriber = new RetrySubscriber<>(downstream, retry.getRetryConfig().getMaxAttempts(), sa, flowable, retry);
flowable.subscribe(retrySubscriber);
});
}
代码示例来源:origin: micronaut-projects/micronaut-core
Single<HttpStatus> passPublisher = Single.fromPublisher(consulClient.pass(checkId));
passPublisher.subscribe((httpStatus, throwable) -> {
if (throwable == null) {
Single.fromPublisher(consulClient.getServiceIds()).subscribe((serviceIds, throwable1) -> {
if (throwable1 == null) {
String serviceId = idGenerator.generateId(environment, instance);
} else {
Single<HttpStatus> failPublisher = Single.fromPublisher(consulClient.fail(checkId, status.getDescription().orElse(null)));
failPublisher.subscribe((httpStatus, throwable) -> {
if (throwable == null) {
代码示例来源:origin: ReactiveX/RxJava
@Test
public void empty() {
Single.fromPublisher(Flowable.empty())
.test()
.assertFailure(NoSuchElementException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void just() {
Single.fromPublisher(Flowable.just(1))
.test()
.assertResult(1);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void fromPublisher() {
Single.fromPublisher(Flowable.just(1))
.test()
.assertResult(1);
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void range() {
Single.fromPublisher(Flowable.range(1, 3))
.test()
.assertFailure(IndexOutOfBoundsException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void error() {
Single.fromPublisher(Flowable.error(new TestException()))
.test()
.assertFailure(TestException.class);
}
代码示例来源:origin: micronaut-projects/micronaut-core
Single.fromPublisher(finalPublisher).subscribeOn(Schedulers.from(channel.eventLoop())).subscribe((BiConsumer<MutableHttpResponse<?>, Throwable>) (actualResponse, throwable) -> {
if (throwable != null) {
ctx.fireExceptionCaught(throwable);
代码示例来源:origin: micronaut-projects/micronaut-core
InstanceInfo instanceInfo = registration.getInstanceInfo();
if (status.equals(HealthStatus.UP)) {
Single<HttpStatus> heartbeatPublisher = Single.fromPublisher(eurekaClient.heartbeat(instanceInfo.getApp(), instanceInfo.getId()));
代码示例来源:origin: ReactiveX/RxJava
@Test
public void dispose() {
PublishProcessor<Integer> pp = PublishProcessor.create();
TestObserver<Integer> to = Single.fromPublisher(pp).test();
assertTrue(pp.hasSubscribers());
pp.onNext(1);
to.cancel();
assertFalse(pp.hasSubscribers());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void badSource() {
List<Throwable> errors = TestHelper.trackPluginErrors();
try {
Single.fromPublisher(new Flowable<Integer>() {
@Override
protected void subscribeActual(Subscriber<? super Integer> s) {
s.onSubscribe(new BooleanSubscription());
BooleanSubscription s2 = new BooleanSubscription();
s.onSubscribe(s2);
assertTrue(s2.isCancelled());
s.onNext(1);
s.onComplete();
s.onNext(2);
s.onError(new TestException());
s.onComplete();
}
})
.test()
.assertResult(1);
TestHelper.assertError(errors, 0, IllegalStateException.class, "Subscription already set!");
TestHelper.assertUndeliverable(errors, 1, TestException.class);
} finally {
RxJavaPlugins.reset();
}
}
}
代码示例来源:origin: io.micronaut/management
/**
* @return the loggers as a {@link Single}
*/
@Read
public Single<Map<String, Object>> loggers() {
return Single.fromPublisher(loggersManager.getLoggers(loggingSystem));
}
代码示例来源:origin: io.micronaut/micronaut-management
/**
* @param name The name of the logger to find
* @return the {@link LogLevel} (both configured and effective) of the named logger
*/
@Read
public Single<Map<String, Object>> logger(@NotBlank @Selector String name) {
return Single.fromPublisher(loggersManager.getLogger(loggingSystem, name));
}
代码示例来源:origin: gravitee-io/graviteeio-access-management
@Override
public Single<Form> update(Form item) {
FormMongo page = convert(item);
return Single.fromPublisher(formsCollection.replaceOne(eq(FIELD_ID, page.getId()), page)).flatMap(updateResult -> findById(page.getId()).toSingle());
}
代码示例来源:origin: gravitee-io/graviteeio-access-management
@Override
public Single<ExtensionGrant> create(ExtensionGrant item) {
ExtensionGrantMongo extensionGrant = convert(item);
extensionGrant.setId(extensionGrant.getId() == null ? (String) idGenerator.generate() : extensionGrant.getId());
return Single.fromPublisher(extensionGrantsCollection.insertOne(extensionGrant)).flatMap(success -> findById(extensionGrant.getId()).toSingle());
}
代码示例来源:origin: gravitee-io/graviteeio-access-management
@Override
public Single<Role> create(Role item) {
RoleMongo role = convert(item);
role.setId(role.getId() == null ? (String) idGenerator.generate() : role.getId());
return Single.fromPublisher(rolesCollection.insertOne(role)).flatMap(success -> findById(role.getId()).toSingle());
}
内容来源于网络,如有侵权,请联系作者删除!