本文整理了Java中io.reactivex.Single.create()
方法的一些代码示例,展示了Single.create()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Single.create()
方法的具体详情如下:
包路径:io.reactivex.Single
类名称:Single
方法名:create
[英]Provides an API (via a cold Completable) that bridges the reactive world with the callback-style world.
Example:
Single.<Event>create(emitter -> {
Callback listener = new Callback() {
@Override
public void onEvent(Event e) {
emitter.onSuccess(e);
}
@Override
public void onFailure(Exception e) {
emitter.onError(e);
}
};
AutoCloseable c = api.someMethod(listener);
emitter.setCancellable(c::close);
});
Scheduler: create does not operate by default on a particular Scheduler.
[中]提供一个API(通过一个cold Completable),将反应式世界与回调式世界连接起来。
例子:
Single.<Event>create(emitter -> {
Callback listener = new Callback() {
@Override
public void onEvent(Event e) {
emitter.onSuccess(e);
}
@Override
public void onFailure(Exception e) {
emitter.onError(e);
}
};
AutoCloseable c = api.someMethod(listener);
emitter.setCancellable(c::close);
});
调度程序:默认情况下,创建不会在特定调度程序上运行。
代码示例来源:origin: pwittchen/ReactiveNetwork
@Override public Single<Boolean> checkInternetConnectivity(final String host, final int port,
final int timeoutInMs, final int httpResponse, final ErrorHandler errorHandler) {
checkGeneralPreconditions(host, port, timeoutInMs, httpResponse, errorHandler);
return Single.create(new SingleOnSubscribe<Boolean>() {
@Override public void subscribe(@NonNull SingleEmitter<Boolean> emitter) {
emitter.onSuccess(isConnected(host, port, timeoutInMs, httpResponse, errorHandler));
}
});
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void createConsumerThrowsOnError() {
Single.create(new SingleOnSubscribe<Object>() {
@Override
public void subscribe(SingleEmitter<Object> s) throws Exception {
try {
s.onError(new IOException());
fail("Should have thrown");
} catch (TestException ex) {
// expected
}
}
})
.subscribe(new SingleObserver<Object>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onSuccess(Object value) {
}
@Override
public void onError(Throwable e) {
throw new TestException();
}
});
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void createConsumerThrows() {
Single.create(new SingleOnSubscribe<Object>() {
@Override
public void subscribe(SingleEmitter<Object> s) throws Exception {
try {
s.onSuccess(1);
fail("Should have thrown");
} catch (TestException ex) {
// expected
}
}
})
.subscribe(new SingleObserver<Object>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onSuccess(Object value) {
throw new TestException();
}
@Override
public void onError(Throwable e) {
}
});
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void nullArgument() {
Single.create(null);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void createConsumerThrowsResourceOnError() {
Single.create(new SingleOnSubscribe<Object>() {
@Override
public void subscribe(SingleEmitter<Object> s) throws Exception {
Disposable d = Disposables.empty();
s.setDisposable(d);
try {
s.onError(new IOException());
fail("Should have thrown");
} catch (TestException ex) {
// expected
}
assertTrue(d.isDisposed());
}
})
.subscribe(new SingleObserver<Object>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onSuccess(Object value) {
}
@Override
public void onError(Throwable e) {
throw new TestException();
}
});
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void createConsumerThrowsResource() {
Single.create(new SingleOnSubscribe<Object>() {
@Override
public void subscribe(SingleEmitter<Object> s) throws Exception {
Disposable d = Disposables.empty();
s.setDisposable(d);
try {
s.onSuccess(1);
fail("Should have thrown");
} catch (TestException ex) {
// expected
}
assertTrue(d.isDisposed());
}
})
.subscribe(new SingleObserver<Object>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onSuccess(Object value) {
throw new TestException();
}
@Override
public void onError(Throwable e) {
}
});
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void dispose() {
TestHelper.checkDisposed(Single.create(new SingleOnSubscribe<Object>() {
@Override
public void subscribe(SingleEmitter<Object> s) throws Exception {
s.onSuccess(1);
}
}));
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void createNullSuccess() {
Single.create(new SingleOnSubscribe<Object>() {
@Override
public void subscribe(SingleEmitter<Object> s) throws Exception {
s.onSuccess(null);
s.onSuccess(null);
}
})
.test()
.assertFailure(NullPointerException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void emitterHasToString() {
Single.create(new SingleOnSubscribe<Object>() {
@Override
public void subscribe(SingleEmitter<Object> emitter) throws Exception {
assertTrue(emitter.toString().contains(SingleCreate.Emitter.class.getSimpleName()));
}
}).test().assertEmpty();
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void createCallbackThrows() {
Single.create(new SingleOnSubscribe<Object>() {
@Override
public void subscribe(SingleEmitter<Object> s) throws Exception {
throw new TestException();
}
})
.test()
.assertFailure(TestException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void createNullError() {
Single.create(new SingleOnSubscribe<Object>() {
@Override
public void subscribe(SingleEmitter<Object> s) throws Exception {
s.onError(null);
s.onError(null);
}
})
.test()
.assertFailure(NullPointerException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void noSubsequentSubscriptionIterable() {
final int[] calls = { 0 };
Single<Integer> source = Single.create(new SingleOnSubscribe<Integer>() {
@Override
public void subscribe(SingleEmitter<Integer> s) throws Exception {
calls[0]++;
s.onSuccess(1);
}
});
Single.concat(Arrays.asList(source, source)).firstElement()
.test()
.assertResult(1);
assertEquals(1, calls[0]);
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void tryOnError() {
List<Throwable> errors = TestHelper.trackPluginErrors();
try {
final Boolean[] response = { null };
Single.create(new SingleOnSubscribe<Object>() {
@Override
public void subscribe(SingleEmitter<Object> e) throws Exception {
e.onSuccess(1);
response[0] = e.tryOnError(new TestException());
}
})
.test()
.assertResult(1);
assertFalse(response[0]);
assertTrue(errors.toString(), errors.isEmpty());
} finally {
RxJavaPlugins.reset();
}
}
代码示例来源:origin: spring-projects/spring-framework
@Test
public void deferredResultSubscriberWithError() throws Exception {
IllegalStateException ex = new IllegalStateException();
// Mono
MonoProcessor<String> mono = MonoProcessor.create();
testDeferredResultSubscriber(mono, Mono.class, forClass(String.class), () -> mono.onError(ex), ex);
// RxJava 1 Single
AtomicReference<SingleEmitter<String>> ref = new AtomicReference<>();
Single<String> single = Single.fromEmitter(ref::set);
testDeferredResultSubscriber(single, Single.class, forClass(String.class), () -> ref.get().onError(ex), ex);
// RxJava 2 Single
AtomicReference<io.reactivex.SingleEmitter<String>> ref2 = new AtomicReference<>();
io.reactivex.Single<String> single2 = io.reactivex.Single.create(ref2::set);
testDeferredResultSubscriber(single2, io.reactivex.Single.class, forClass(String.class),
() -> ref2.get().onError(ex), ex);
}
代码示例来源:origin: line/armeria
private static Single<String> single(String input) {
RequestContext.current();
return Single.create(emitter -> {
RequestContext.current();
emitter.onSuccess(input);
});
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void noSubsequentSubscription() {
final int[] calls = { 0 };
Single<Integer> source = Single.create(new SingleOnSubscribe<Integer>() {
@Override
public void subscribe(SingleEmitter<Integer> s) throws Exception {
calls[0]++;
s.onSuccess(1);
}
});
Single.concatArray(source, source).firstElement()
.test()
.assertResult(1);
assertEquals(1, calls[0]);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void basicWithError() {
final Disposable d = Disposables.empty();
Single.<Integer>create(new SingleOnSubscribe<Integer>() {
@Override
public void subscribe(SingleEmitter<Integer> e) throws Exception {
e.setDisposable(d);
e.onError(new TestException());
e.onSuccess(2);
e.onError(new TestException());
}
})
.test()
.assertFailure(TestException.class);
assertTrue(d.isDisposed());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void basic() {
final Disposable d = Disposables.empty();
Single.<Integer>create(new SingleOnSubscribe<Integer>() {
@Override
public void subscribe(SingleEmitter<Integer> e) throws Exception {
e.setDisposable(d);
e.onSuccess(1);
e.onError(new TestException());
e.onSuccess(2);
e.onError(new TestException());
}
})
.test()
.assertResult(1);
assertTrue(d.isDisposed());
}
代码示例来源:origin: spring-projects/spring-framework
@Test
public void deferredResultSubscriberWithOneValue() throws Exception {
// Mono
MonoProcessor<String> mono = MonoProcessor.create();
testDeferredResultSubscriber(mono, Mono.class, forClass(String.class), () -> mono.onNext("foo"), "foo");
// Mono empty
MonoProcessor<String> monoEmpty = MonoProcessor.create();
testDeferredResultSubscriber(monoEmpty, Mono.class, forClass(String.class), monoEmpty::onComplete, null);
// RxJava 1 Single
AtomicReference<SingleEmitter<String>> ref = new AtomicReference<>();
Single<String> single = Single.fromEmitter(ref::set);
testDeferredResultSubscriber(single, Single.class, forClass(String.class),
() -> ref.get().onSuccess("foo"), "foo");
// RxJava 2 Single
AtomicReference<io.reactivex.SingleEmitter<String>> ref2 = new AtomicReference<>();
io.reactivex.Single<String> single2 = io.reactivex.Single.create(ref2::set);
testDeferredResultSubscriber(single2, io.reactivex.Single.class, forClass(String.class),
() -> ref2.get().onSuccess("foo"), "foo");
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void basicWithCancellable() {
final Disposable d1 = Disposables.empty();
final Disposable d2 = Disposables.empty();
Single.<Integer>create(new SingleOnSubscribe<Integer>() {
@Override
public void subscribe(SingleEmitter<Integer> e) throws Exception {
e.setDisposable(d1);
e.setCancellable(new Cancellable() {
@Override
public void cancel() throws Exception {
d2.dispose();
}
});
e.onSuccess(1);
e.onError(new TestException());
e.onSuccess(2);
e.onError(new TestException());
}
})
.test()
.assertResult(1);
assertTrue(d1.isDisposed());
assertTrue(d2.isDisposed());
}
内容来源于网络,如有侵权,请联系作者删除!