本文整理了Java中io.reactivex.Single.flattenAsObservable()
方法的一些代码示例,展示了Single.flattenAsObservable()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Single.flattenAsObservable()
方法的具体详情如下:
包路径:io.reactivex.Single
类名称:Single
方法名:flattenAsObservable
[英]Returns an Observable that maps a success value into an Iterable and emits its items.
Scheduler: flattenAsObservable does not operate by default on a particular Scheduler.
[中]返回将成功值映射为Iterable并发出其项的Observable。
调度器:默认情况下,FlattasObservable不会在特定的调度器上运行。
代码示例来源:origin: ReactiveX/RxJava
@Override
public ObservableSource<Integer> apply(Single<Object> o) throws Exception {
return o.flattenAsObservable(new Function<Object, Iterable<Integer>>() {
@Override
public Iterable<Integer> apply(Object v) throws Exception {
return Collections.singleton(1);
}
});
}
});
代码示例来源:origin: ReactiveX/RxJava
@Test
public void fusedEmptyCheck() {
Single.just(1)
.flattenAsObservable(new Function<Object, Iterable<Integer>>() {
@Override
public Iterable<Integer> apply(Object v) throws Exception {
代码示例来源:origin: ReactiveX/RxJava
@Test
public void dispose() {
TestHelper.checkDisposed(Single.just(1).flattenAsObservable(new Function<Object, Iterable<Integer>>() {
@Override
public Iterable<Integer> apply(Object v) throws Exception {
return Collections.singleton(1);
}
}));
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void iteratorCrash() {
Single.just(1).flattenAsObservable(new Function<Integer, Iterable<Integer>>() {
@Override
public Iterable<Integer> apply(Integer v) throws Exception {
return new CrashingIterable(1, 100, 100);
}
})
.test()
.assertFailureAndMessage(TestException.class, "iterator()");
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void hasNextCrash() {
Single.just(1).flattenAsObservable(new Function<Integer, Iterable<Integer>>() {
@Override
public Iterable<Integer> apply(Integer v) throws Exception {
return new CrashingIterable(100, 1, 100);
}
})
.test()
.assertFailureAndMessage(TestException.class, "hasNext()");
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void nextCrash() {
Single.just(1).flattenAsObservable(new Function<Integer, Iterable<Integer>>() {
@Override
public Iterable<Integer> apply(Integer v) throws Exception {
return new CrashingIterable(100, 100, 1);
}
})
.test()
.assertFailureAndMessage(TestException.class, "next()");
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void normal() {
Single.just(1).flattenAsObservable(new Function<Integer, Iterable<Integer>>() {
@Override
public Iterable<Integer> apply(Integer v) throws Exception {
return Arrays.asList(v, v + 1);
}
})
.test()
.assertResult(1, 2);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void hasNextCrash2() {
Single.just(1).flattenAsObservable(new Function<Integer, Iterable<Integer>>() {
@Override
public Iterable<Integer> apply(Integer v) throws Exception {
return new CrashingIterable(100, 2, 100);
}
})
.test()
.assertFailureAndMessage(TestException.class, "hasNext()", 0);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void emptyIterable() {
Single.just(1).flattenAsObservable(new Function<Integer, Iterable<Integer>>() {
@Override
public Iterable<Integer> apply(Integer v) throws Exception {
return Collections.<Integer>emptyList();
}
})
.test()
.assertResult();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void error() {
Single.<Integer>error(new TestException()).flattenAsObservable(new Function<Integer, Iterable<Integer>>() {
@Override
public Iterable<Integer> apply(Integer v) throws Exception {
return Arrays.asList(v, v + 1);
}
})
.test()
.assertFailure(TestException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void take() {
Single.just(1).flattenAsObservable(new Function<Integer, Iterable<Integer>>() {
@Override
public Iterable<Integer> apply(Integer v) throws Exception {
return Arrays.asList(v, v + 1);
}
})
.take(1)
.test()
.assertResult(1);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void fused() {
TestObserver<Integer> to = ObserverFusion.newTest(QueueFuseable.ANY);
Single.just(1).flattenAsObservable(new Function<Integer, Iterable<Integer>>() {
@Override
public Iterable<Integer> apply(Integer v) throws Exception {
return Arrays.asList(v, v + 1);
}
})
.subscribe(to);
to.assertOf(ObserverFusion.<Integer>assertFuseable())
.assertOf(ObserverFusion.<Integer>assertFusionMode(QueueFuseable.ASYNC))
.assertResult(1, 2);
;
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void fusedNoSync() {
TestObserver<Integer> to = ObserverFusion.newTest(QueueFuseable.SYNC);
Single.just(1).flattenAsObservable(new Function<Integer, Iterable<Integer>>() {
@Override
public Iterable<Integer> apply(Integer v) throws Exception {
return Arrays.asList(v, v + 1);
}
})
.subscribe(to);
to.assertOf(ObserverFusion.<Integer>assertFuseable())
.assertOf(ObserverFusion.<Integer>assertFusionMode(QueueFuseable.NONE))
.assertResult(1, 2);
;
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void async2() {
Single.just(1)
.flattenAsObservable(new Function<Object, Iterable<Integer>>() {
@Override
public Iterable<Integer> apply(Object v) throws Exception {
Integer[] array = new Integer[1000 * 1000];
Arrays.fill(array, 1);
return Arrays.asList(array);
}
})
.observeOn(Schedulers.single())
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertSubscribed()
.assertValueCount(1000 * 1000)
.assertNoErrors()
.assertComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void async1() {
Single.just(1)
.flattenAsObservable(new Function<Object, Iterable<Integer>>() {
@Override
public Iterable<Integer> apply(Object v) throws Exception {
Integer[] array = new Integer[1000 * 1000];
Arrays.fill(array, 1);
return Arrays.asList(array);
}
})
.hide()
.observeOn(Schedulers.single())
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertSubscribed()
.assertValueCount(1000 * 1000)
.assertNoErrors()
.assertComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void async4() {
Single.just(1)
.flattenAsObservable(new Function<Object, Iterable<Integer>>() {
@Override
public Iterable<Integer> apply(Object v) throws Exception {
Integer[] array = new Integer[1000 * 1000];
Arrays.fill(array, 1);
return Arrays.asList(array);
}
})
.observeOn(Schedulers.single())
.take(500 * 1000)
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertSubscribed()
.assertValueCount(500 * 1000)
.assertNoErrors()
.assertComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void async3() {
Single.just(1)
.flattenAsObservable(new Function<Object, Iterable<Integer>>() {
@Override
public Iterable<Integer> apply(Object v) throws Exception {
Integer[] array = new Integer[1000 * 1000];
Arrays.fill(array, 1);
return Arrays.asList(array);
}
})
.take(500 * 1000)
.observeOn(Schedulers.single())
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertSubscribed()
.assertValueCount(500 * 1000)
.assertNoErrors()
.assertComplete();
}
代码示例来源:origin: akarnokd/akarnokd-misc
@Test
public void test() throws Exception {
Observable<String> first = Observable.fromCallable(() -> "HEY").delay(250, TimeUnit.MILLISECONDS);
Observable<Integer> second = Observable.fromCallable(() -> 1).delay(350, TimeUnit.MILLISECONDS);
List<Observable<?>> observables = com.google.common.collect.Lists.newArrayList(first, second);
Map<Long, Object> someWeirdMapWithObject = com.google.common.collect.ImmutableMap.of(
1L, new BrandBuilder(1),
2L, new BrandBuilder(2)
);
Observable
.fromIterable(observables)
.flatMap(task -> task.observeOn(Schedulers.computation()))
// wait for all tasks to finish
.lastOrError()
.flattenAsObservable(x -> someWeirdMapWithObject.values())
.<BrandBuilder>cast(BrandBuilder.class)
.map(BrandBuilder::build)
.toList().blockingGet();
}
代码示例来源:origin: uk.os.vt/vt-mbtiles
@Override
public void putEntries(Observable<Entry> entries) {
final String insert =
"INSERT OR REPLACE INTO TILES(zoom_level, tile_column, tile_row, tile_data)"
+ " values (?, ?, ?, ?);";
final Observable<Object> params = entries.concatMap(entry -> {
byte[] compressedMvt;
try {
compressedMvt = CompressUtil.getCompressedAsGzip(entry.getVector());
} catch (final IOException ex) {
throw Exceptions.propagate(ex);
}
return Observable.<Object>just(entry.getZoomLevel(), entry.getColumn(),
flipY(entry.getRow(), entry.getZoomLevel()), compressedMvt);
})
// source: https://github.com/davidmoten/rxjava-jdbc/pull/46/files
.toList()
.flattenAsObservable(objects -> objects);
// TODO update when upstream is enhanced
dataSource.update(insert)
.parameterStream(params.toFlowable(BackpressureStrategy.BUFFER))
.counts()
.test() // TODO remove hack
.awaitDone(5, TimeUnit.SECONDS)
.assertComplete();
}
内容来源于网络,如有侵权,请联系作者删除!