本文整理了Java中io.reactivex.Single.flattenAsFlowable()
方法的一些代码示例,展示了Single.flattenAsFlowable()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Single.flattenAsFlowable()
方法的具体详情如下:
包路径:io.reactivex.Single
类名称:Single
方法名:flattenAsFlowable
[英]Returns a Flowable that merges each item emitted by the source Single with the values in an Iterable corresponding to that item that is generated by a selector.
Backpressure: The operator honors backpressure from downstream. Scheduler: flattenAsFlowable does not operate by default on a particular Scheduler.
[中]返回一个Flowable,它将源发出的每个项与选择器生成的该项对应的Iterable中的值合并。
背压:操作员接受来自下游的背压。Scheduler:FlattasFlowable默认情况下不会在特定计划程序上运行。
代码示例来源:origin: ReactiveX/RxJava
@Test
public void fusedEmptyCheck() {
Single.just(1)
.flattenAsFlowable(new Function<Object, Iterable<Integer>>() {
@Override
public Iterable<Integer> apply(Object v) throws Exception {
代码示例来源:origin: ReactiveX/RxJava
@Test
public void hasNextThrows() {
Single.just(1)
.flattenAsFlowable(new Function<Object, Iterable<Integer>>() {
@Override
public Iterable<Integer> apply(Object v) throws Exception {
return new CrashingIterable(100, 2, 100);
}
})
.test(2L)
.assertFailureAndMessage(TestException.class, "hasNext()", 0);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void nextThrows() {
Single.just(1)
.flattenAsFlowable(new Function<Object, Iterable<Integer>>() {
@Override
public Iterable<Integer> apply(Object v) throws Exception {
return new CrashingIterable(100, 100, 1);
}
})
.test(2L)
.assertFailureAndMessage(TestException.class, "next()");
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void normal() {
Single.just(1).flattenAsFlowable(new Function<Integer, Iterable<Integer>>() {
@Override
public Iterable<Integer> apply(Integer v) throws Exception {
return Arrays.asList(v, v + 1);
}
})
.test()
.assertResult(1, 2);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void emptyIterable() {
Single.just(1).flattenAsFlowable(new Function<Integer, Iterable<Integer>>() {
@Override
public Iterable<Integer> apply(Integer v) throws Exception {
return Collections.<Integer>emptyList();
}
})
.test()
.assertResult();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void nextCrash() {
Single.just(1).flattenAsFlowable(new Function<Integer, Iterable<Integer>>() {
@Override
public Iterable<Integer> apply(Integer v) throws Exception {
return new CrashingIterable(100, 100, 1);
}
})
.test()
.assertFailureAndMessage(TestException.class, "next()");
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void hasNextThrowsUnbounded() {
Single.just(1)
.flattenAsFlowable(new Function<Object, Iterable<Integer>>() {
@Override
public Iterable<Integer> apply(Object v) throws Exception {
return new CrashingIterable(100, 2, 100);
}
})
.test()
.assertFailureAndMessage(TestException.class, "hasNext()", 0);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void iteratorCrash() {
Single.just(1).flattenAsFlowable(new Function<Integer, Iterable<Integer>>() {
@Override
public Iterable<Integer> apply(Integer v) throws Exception {
return new CrashingIterable(1, 100, 100);
}
})
.test()
.assertFailureAndMessage(TestException.class, "iterator()");
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void hasNextCrash() {
Single.just(1).flattenAsFlowable(new Function<Integer, Iterable<Integer>>() {
@Override
public Iterable<Integer> apply(Integer v) throws Exception {
return new CrashingIterable(100, 1, 100);
}
})
.test()
.assertFailureAndMessage(TestException.class, "hasNext()");
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void nextThrowsUnbounded() {
Single.just(1)
.flattenAsFlowable(new Function<Object, Iterable<Integer>>() {
@Override
public Iterable<Integer> apply(Object v) throws Exception {
return new CrashingIterable(100, 100, 1);
}
})
.test()
.assertFailureAndMessage(TestException.class, "next()");
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void hasNextCrash2() {
Single.just(1).flattenAsFlowable(new Function<Integer, Iterable<Integer>>() {
@Override
public Iterable<Integer> apply(Integer v) throws Exception {
return new CrashingIterable(100, 2, 100);
}
})
.test()
.assertFailureAndMessage(TestException.class, "hasNext()", 0);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void take() {
Single.just(1).flattenAsFlowable(new Function<Integer, Iterable<Integer>>() {
@Override
public Iterable<Integer> apply(Integer v) throws Exception {
return Arrays.asList(v, v + 1);
}
})
.take(1)
.test()
.assertResult(1);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void error() {
Single.<Integer>error(new TestException()).flattenAsFlowable(new Function<Integer, Iterable<Integer>>() {
@Override
public Iterable<Integer> apply(Integer v) throws Exception {
return Arrays.asList(v, v + 1);
}
})
.test()
.assertFailure(TestException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void backpressure() {
TestSubscriber<Integer> ts = Single.just(1).flattenAsFlowable(new Function<Integer, Iterable<Integer>>() {
@Override
public Iterable<Integer> apply(Integer v) throws Exception {
return Arrays.asList(v, v + 1);
}
})
.test(0);
ts.assertEmpty();
ts.request(1);
ts.assertValue(1);
ts.request(1);
ts.assertResult(1, 2);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void fused() {
TestSubscriber<Integer> ts = SubscriberFusion.newTest(QueueFuseable.ANY);
Single.just(1).flattenAsFlowable(new Function<Integer, Iterable<Integer>>() {
@Override
public Iterable<Integer> apply(Integer v) throws Exception {
return Arrays.asList(v, v + 1);
}
})
.subscribe(ts);
ts.assertOf(SubscriberFusion.<Integer>assertFuseable())
.assertOf(SubscriberFusion.<Integer>assertFusionMode(QueueFuseable.ASYNC))
.assertResult(1, 2);
;
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void fusedNoSync() {
TestSubscriber<Integer> ts = SubscriberFusion.newTest(QueueFuseable.SYNC);
Single.just(1).flattenAsFlowable(new Function<Integer, Iterable<Integer>>() {
@Override
public Iterable<Integer> apply(Integer v) throws Exception {
return Arrays.asList(v, v + 1);
}
})
.subscribe(ts);
ts.assertOf(SubscriberFusion.<Integer>assertFuseable())
.assertOf(SubscriberFusion.<Integer>assertFusionMode(QueueFuseable.NONE))
.assertResult(1, 2);
;
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void async2() {
Single.just(1)
.flattenAsFlowable(new Function<Object, Iterable<Integer>>() {
@Override
public Iterable<Integer> apply(Object v) throws Exception {
Integer[] array = new Integer[1000 * 1000];
Arrays.fill(array, 1);
return Arrays.asList(array);
}
})
.observeOn(Schedulers.single())
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertSubscribed()
.assertValueCount(1000 * 1000)
.assertNoErrors()
.assertComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void async3() {
Single.just(1)
.flattenAsFlowable(new Function<Object, Iterable<Integer>>() {
@Override
public Iterable<Integer> apply(Object v) throws Exception {
Integer[] array = new Integer[1000 * 1000];
Arrays.fill(array, 1);
return Arrays.asList(array);
}
})
.take(500 * 1000)
.observeOn(Schedulers.single())
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertSubscribed()
.assertValueCount(500 * 1000)
.assertNoErrors()
.assertComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void async1() {
Single.just(1)
.flattenAsFlowable(new Function<Object, Iterable<Integer>>() {
@Override
public Iterable<Integer> apply(Object v) throws Exception {
Integer[] array = new Integer[1000 * 1000];
Arrays.fill(array, 1);
return Arrays.asList(array);
}
})
.hide()
.observeOn(Schedulers.single())
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertSubscribed()
.assertValueCount(1000 * 1000)
.assertNoErrors()
.assertComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void async4() {
Single.just(1)
.flattenAsFlowable(new Function<Object, Iterable<Integer>>() {
@Override
public Iterable<Integer> apply(Object v) throws Exception {
Integer[] array = new Integer[1000 * 1000];
Arrays.fill(array, 1);
return Arrays.asList(array);
}
})
.observeOn(Schedulers.single())
.take(500 * 1000)
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertSubscribed()
.assertValueCount(500 * 1000)
.assertNoErrors()
.assertComplete();
}
内容来源于网络,如有侵权,请联系作者删除!