io.reactivex.Single.flattenAsFlowable()方法的使用及代码示例

x33g5p2x  于2022-01-29 转载在 其他  
字(7.9k)|赞(0)|评价(0)|浏览(112)

本文整理了Java中io.reactivex.Single.flattenAsFlowable()方法的一些代码示例,展示了Single.flattenAsFlowable()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Single.flattenAsFlowable()方法的具体详情如下:
包路径:io.reactivex.Single
类名称:Single
方法名:flattenAsFlowable

Single.flattenAsFlowable介绍

[英]Returns a Flowable that merges each item emitted by the source Single with the values in an Iterable corresponding to that item that is generated by a selector.

Backpressure: The operator honors backpressure from downstream. Scheduler: flattenAsFlowable does not operate by default on a particular Scheduler.
[中]返回一个Flowable,它将源发出的每个项与选择器生成的该项对应的Iterable中的值合并。
背压:操作员接受来自下游的背压。Scheduler:FlattasFlowable默认情况下不会在特定计划程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Test
public void fusedEmptyCheck() {
  Single.just(1)
  .flattenAsFlowable(new Function<Object, Iterable<Integer>>() {
        @Override
        public Iterable<Integer> apply(Object v) throws Exception {

代码示例来源:origin: ReactiveX/RxJava

@Test
public void hasNextThrows() {
  Single.just(1)
  .flattenAsFlowable(new Function<Object, Iterable<Integer>>() {
        @Override
        public Iterable<Integer> apply(Object v) throws Exception {
          return new CrashingIterable(100, 2, 100);
        }
      })
  .test(2L)
  .assertFailureAndMessage(TestException.class, "hasNext()", 0);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void nextThrows() {
  Single.just(1)
  .flattenAsFlowable(new Function<Object, Iterable<Integer>>() {
        @Override
        public Iterable<Integer> apply(Object v) throws Exception {
          return new CrashingIterable(100, 100, 1);
        }
      })
  .test(2L)
  .assertFailureAndMessage(TestException.class, "next()");
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void normal() {
  Single.just(1).flattenAsFlowable(new Function<Integer, Iterable<Integer>>() {
    @Override
    public Iterable<Integer> apply(Integer v) throws Exception {
      return Arrays.asList(v, v + 1);
    }
  })
  .test()
  .assertResult(1, 2);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void emptyIterable() {
  Single.just(1).flattenAsFlowable(new Function<Integer, Iterable<Integer>>() {
    @Override
    public Iterable<Integer> apply(Integer v) throws Exception {
      return Collections.<Integer>emptyList();
    }
  })
  .test()
  .assertResult();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void nextCrash() {
  Single.just(1).flattenAsFlowable(new Function<Integer, Iterable<Integer>>() {
    @Override
    public Iterable<Integer> apply(Integer v) throws Exception {
      return new CrashingIterable(100, 100, 1);
    }
  })
  .test()
  .assertFailureAndMessage(TestException.class, "next()");
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void hasNextThrowsUnbounded() {
  Single.just(1)
  .flattenAsFlowable(new Function<Object, Iterable<Integer>>() {
        @Override
        public Iterable<Integer> apply(Object v) throws Exception {
          return new CrashingIterable(100, 2, 100);
        }
      })
  .test()
  .assertFailureAndMessage(TestException.class, "hasNext()", 0);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void iteratorCrash() {
  Single.just(1).flattenAsFlowable(new Function<Integer, Iterable<Integer>>() {
    @Override
    public Iterable<Integer> apply(Integer v) throws Exception {
      return new CrashingIterable(1, 100, 100);
    }
  })
  .test()
  .assertFailureAndMessage(TestException.class, "iterator()");
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void hasNextCrash() {
  Single.just(1).flattenAsFlowable(new Function<Integer, Iterable<Integer>>() {
    @Override
    public Iterable<Integer> apply(Integer v) throws Exception {
      return new CrashingIterable(100, 1, 100);
    }
  })
  .test()
  .assertFailureAndMessage(TestException.class, "hasNext()");
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void nextThrowsUnbounded() {
  Single.just(1)
  .flattenAsFlowable(new Function<Object, Iterable<Integer>>() {
        @Override
        public Iterable<Integer> apply(Object v) throws Exception {
          return new CrashingIterable(100, 100, 1);
        }
      })
  .test()
  .assertFailureAndMessage(TestException.class, "next()");
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void hasNextCrash2() {
  Single.just(1).flattenAsFlowable(new Function<Integer, Iterable<Integer>>() {
    @Override
    public Iterable<Integer> apply(Integer v) throws Exception {
      return new CrashingIterable(100, 2, 100);
    }
  })
  .test()
  .assertFailureAndMessage(TestException.class, "hasNext()", 0);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void take() {
  Single.just(1).flattenAsFlowable(new Function<Integer, Iterable<Integer>>() {
    @Override
    public Iterable<Integer> apply(Integer v) throws Exception {
      return Arrays.asList(v, v + 1);
    }
  })
  .take(1)
  .test()
  .assertResult(1);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void error() {
  Single.<Integer>error(new TestException()).flattenAsFlowable(new Function<Integer, Iterable<Integer>>() {
    @Override
    public Iterable<Integer> apply(Integer v) throws Exception {
      return Arrays.asList(v, v + 1);
    }
  })
  .test()
  .assertFailure(TestException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void backpressure() {
  TestSubscriber<Integer> ts = Single.just(1).flattenAsFlowable(new Function<Integer, Iterable<Integer>>() {
    @Override
    public Iterable<Integer> apply(Integer v) throws Exception {
      return Arrays.asList(v, v + 1);
    }
  })
  .test(0);
  ts.assertEmpty();
  ts.request(1);
  ts.assertValue(1);
  ts.request(1);
  ts.assertResult(1, 2);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void fused() {
  TestSubscriber<Integer> ts = SubscriberFusion.newTest(QueueFuseable.ANY);
  Single.just(1).flattenAsFlowable(new Function<Integer, Iterable<Integer>>() {
    @Override
    public Iterable<Integer> apply(Integer v) throws Exception {
      return Arrays.asList(v, v + 1);
    }
  })
  .subscribe(ts);
  ts.assertOf(SubscriberFusion.<Integer>assertFuseable())
  .assertOf(SubscriberFusion.<Integer>assertFusionMode(QueueFuseable.ASYNC))
  .assertResult(1, 2);
  ;
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void fusedNoSync() {
  TestSubscriber<Integer> ts = SubscriberFusion.newTest(QueueFuseable.SYNC);
  Single.just(1).flattenAsFlowable(new Function<Integer, Iterable<Integer>>() {
    @Override
    public Iterable<Integer> apply(Integer v) throws Exception {
      return Arrays.asList(v, v + 1);
    }
  })
  .subscribe(ts);
  ts.assertOf(SubscriberFusion.<Integer>assertFuseable())
  .assertOf(SubscriberFusion.<Integer>assertFusionMode(QueueFuseable.NONE))
  .assertResult(1, 2);
  ;
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void async2() {
  Single.just(1)
  .flattenAsFlowable(new Function<Object, Iterable<Integer>>() {
        @Override
        public Iterable<Integer> apply(Object v) throws Exception {
          Integer[] array = new Integer[1000 * 1000];
          Arrays.fill(array, 1);
          return Arrays.asList(array);
        }
      })
  .observeOn(Schedulers.single())
  .test()
  .awaitDone(5, TimeUnit.SECONDS)
  .assertSubscribed()
  .assertValueCount(1000 * 1000)
  .assertNoErrors()
  .assertComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void async3() {
  Single.just(1)
  .flattenAsFlowable(new Function<Object, Iterable<Integer>>() {
        @Override
        public Iterable<Integer> apply(Object v) throws Exception {
          Integer[] array = new Integer[1000 * 1000];
          Arrays.fill(array, 1);
          return Arrays.asList(array);
        }
      })
  .take(500 * 1000)
  .observeOn(Schedulers.single())
  .test()
  .awaitDone(5, TimeUnit.SECONDS)
  .assertSubscribed()
  .assertValueCount(500 * 1000)
  .assertNoErrors()
  .assertComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void async1() {
  Single.just(1)
  .flattenAsFlowable(new Function<Object, Iterable<Integer>>() {
        @Override
        public Iterable<Integer> apply(Object v) throws Exception {
          Integer[] array = new Integer[1000 * 1000];
          Arrays.fill(array, 1);
          return Arrays.asList(array);
        }
      })
  .hide()
  .observeOn(Schedulers.single())
  .test()
  .awaitDone(5, TimeUnit.SECONDS)
  .assertSubscribed()
  .assertValueCount(1000 * 1000)
  .assertNoErrors()
  .assertComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void async4() {
  Single.just(1)
  .flattenAsFlowable(new Function<Object, Iterable<Integer>>() {
        @Override
        public Iterable<Integer> apply(Object v) throws Exception {
          Integer[] array = new Integer[1000 * 1000];
          Arrays.fill(array, 1);
          return Arrays.asList(array);
        }
      })
  .observeOn(Schedulers.single())
  .take(500 * 1000)
  .test()
  .awaitDone(5, TimeUnit.SECONDS)
  .assertSubscribed()
  .assertValueCount(500 * 1000)
  .assertNoErrors()
  .assertComplete();
}

相关文章

微信公众号

最新文章

更多