io.reactivex.Single.flattenAsObservable()方法的使用及代码示例

x33g5p2x  于2022-01-29 转载在 其他  
字(8.3k)|赞(0)|评价(0)|浏览(141)

本文整理了Java中io.reactivex.Single.flattenAsObservable()方法的一些代码示例,展示了Single.flattenAsObservable()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Single.flattenAsObservable()方法的具体详情如下:
包路径:io.reactivex.Single
类名称:Single
方法名:flattenAsObservable

Single.flattenAsObservable介绍

[英]Returns an Observable that maps a success value into an Iterable and emits its items.

Scheduler: flattenAsObservable does not operate by default on a particular Scheduler.
[中]返回将成功值映射为Iterable并发出其项的Observable。
调度器:默认情况下,FlattasObservable不会在特定的调度器上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Override
  public ObservableSource<Integer> apply(Single<Object> o) throws Exception {
    return o.flattenAsObservable(new Function<Object, Iterable<Integer>>() {
      @Override
      public Iterable<Integer> apply(Object v) throws Exception {
        return Collections.singleton(1);
      }
    });
  }
});

代码示例来源:origin: ReactiveX/RxJava

@Test
public void fusedEmptyCheck() {
  Single.just(1)
  .flattenAsObservable(new Function<Object, Iterable<Integer>>() {
        @Override
        public Iterable<Integer> apply(Object v) throws Exception {

代码示例来源:origin: ReactiveX/RxJava

@Test
public void dispose() {
  TestHelper.checkDisposed(Single.just(1).flattenAsObservable(new Function<Object, Iterable<Integer>>() {
        @Override
        public Iterable<Integer> apply(Object v) throws Exception {
          return Collections.singleton(1);
        }
      }));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void iteratorCrash() {
  Single.just(1).flattenAsObservable(new Function<Integer, Iterable<Integer>>() {
    @Override
    public Iterable<Integer> apply(Integer v) throws Exception {
      return new CrashingIterable(1, 100, 100);
    }
  })
  .test()
  .assertFailureAndMessage(TestException.class, "iterator()");
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void hasNextCrash() {
  Single.just(1).flattenAsObservable(new Function<Integer, Iterable<Integer>>() {
    @Override
    public Iterable<Integer> apply(Integer v) throws Exception {
      return new CrashingIterable(100, 1, 100);
    }
  })
  .test()
  .assertFailureAndMessage(TestException.class, "hasNext()");
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void nextCrash() {
  Single.just(1).flattenAsObservable(new Function<Integer, Iterable<Integer>>() {
    @Override
    public Iterable<Integer> apply(Integer v) throws Exception {
      return new CrashingIterable(100, 100, 1);
    }
  })
  .test()
  .assertFailureAndMessage(TestException.class, "next()");
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void normal() {
  Single.just(1).flattenAsObservable(new Function<Integer, Iterable<Integer>>() {
    @Override
    public Iterable<Integer> apply(Integer v) throws Exception {
      return Arrays.asList(v, v + 1);
    }
  })
  .test()
  .assertResult(1, 2);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void hasNextCrash2() {
  Single.just(1).flattenAsObservable(new Function<Integer, Iterable<Integer>>() {
    @Override
    public Iterable<Integer> apply(Integer v) throws Exception {
      return new CrashingIterable(100, 2, 100);
    }
  })
  .test()
  .assertFailureAndMessage(TestException.class, "hasNext()", 0);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void emptyIterable() {
  Single.just(1).flattenAsObservable(new Function<Integer, Iterable<Integer>>() {
    @Override
    public Iterable<Integer> apply(Integer v) throws Exception {
      return Collections.<Integer>emptyList();
    }
  })
  .test()
  .assertResult();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void error() {
  Single.<Integer>error(new TestException()).flattenAsObservable(new Function<Integer, Iterable<Integer>>() {
    @Override
    public Iterable<Integer> apply(Integer v) throws Exception {
      return Arrays.asList(v, v + 1);
    }
  })
  .test()
  .assertFailure(TestException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void take() {
  Single.just(1).flattenAsObservable(new Function<Integer, Iterable<Integer>>() {
    @Override
    public Iterable<Integer> apply(Integer v) throws Exception {
      return Arrays.asList(v, v + 1);
    }
  })
  .take(1)
  .test()
  .assertResult(1);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void fused() {
  TestObserver<Integer> to = ObserverFusion.newTest(QueueFuseable.ANY);
  Single.just(1).flattenAsObservable(new Function<Integer, Iterable<Integer>>() {
    @Override
    public Iterable<Integer> apply(Integer v) throws Exception {
      return Arrays.asList(v, v + 1);
    }
  })
  .subscribe(to);
  to.assertOf(ObserverFusion.<Integer>assertFuseable())
  .assertOf(ObserverFusion.<Integer>assertFusionMode(QueueFuseable.ASYNC))
  .assertResult(1, 2);
  ;
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void fusedNoSync() {
  TestObserver<Integer> to = ObserverFusion.newTest(QueueFuseable.SYNC);
  Single.just(1).flattenAsObservable(new Function<Integer, Iterable<Integer>>() {
    @Override
    public Iterable<Integer> apply(Integer v) throws Exception {
      return Arrays.asList(v, v + 1);
    }
  })
  .subscribe(to);
  to.assertOf(ObserverFusion.<Integer>assertFuseable())
  .assertOf(ObserverFusion.<Integer>assertFusionMode(QueueFuseable.NONE))
  .assertResult(1, 2);
  ;
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void async2() {
  Single.just(1)
  .flattenAsObservable(new Function<Object, Iterable<Integer>>() {
        @Override
        public Iterable<Integer> apply(Object v) throws Exception {
          Integer[] array = new Integer[1000 * 1000];
          Arrays.fill(array, 1);
          return Arrays.asList(array);
        }
      })
  .observeOn(Schedulers.single())
  .test()
  .awaitDone(5, TimeUnit.SECONDS)
  .assertSubscribed()
  .assertValueCount(1000 * 1000)
  .assertNoErrors()
  .assertComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void async1() {
  Single.just(1)
  .flattenAsObservable(new Function<Object, Iterable<Integer>>() {
        @Override
        public Iterable<Integer> apply(Object v) throws Exception {
          Integer[] array = new Integer[1000 * 1000];
          Arrays.fill(array, 1);
          return Arrays.asList(array);
        }
      })
  .hide()
  .observeOn(Schedulers.single())
  .test()
  .awaitDone(5, TimeUnit.SECONDS)
  .assertSubscribed()
  .assertValueCount(1000 * 1000)
  .assertNoErrors()
  .assertComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void async4() {
  Single.just(1)
  .flattenAsObservable(new Function<Object, Iterable<Integer>>() {
        @Override
        public Iterable<Integer> apply(Object v) throws Exception {
          Integer[] array = new Integer[1000 * 1000];
          Arrays.fill(array, 1);
          return Arrays.asList(array);
        }
      })
  .observeOn(Schedulers.single())
  .take(500 * 1000)
  .test()
  .awaitDone(5, TimeUnit.SECONDS)
  .assertSubscribed()
  .assertValueCount(500 * 1000)
  .assertNoErrors()
  .assertComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void async3() {
  Single.just(1)
  .flattenAsObservable(new Function<Object, Iterable<Integer>>() {
        @Override
        public Iterable<Integer> apply(Object v) throws Exception {
          Integer[] array = new Integer[1000 * 1000];
          Arrays.fill(array, 1);
          return Arrays.asList(array);
        }
      })
  .take(500 * 1000)
  .observeOn(Schedulers.single())
  .test()
  .awaitDone(5, TimeUnit.SECONDS)
  .assertSubscribed()
  .assertValueCount(500 * 1000)
  .assertNoErrors()
  .assertComplete();
}

代码示例来源:origin: akarnokd/akarnokd-misc

@Test
public void test() throws Exception {
  Observable<String> first = Observable.fromCallable(() -> "HEY").delay(250, TimeUnit.MILLISECONDS);
  Observable<Integer> second = Observable.fromCallable(() -> 1).delay(350, TimeUnit.MILLISECONDS);
  List<Observable<?>> observables = com.google.common.collect.Lists.newArrayList(first, second);
  Map<Long, Object> someWeirdMapWithObject = com.google.common.collect.ImmutableMap.of(
      1L, new BrandBuilder(1),
      2L, new BrandBuilder(2)
  );
  Observable
      .fromIterable(observables)
      .flatMap(task -> task.observeOn(Schedulers.computation()))
      // wait for all tasks to finish
      .lastOrError()
      .flattenAsObservable(x -> someWeirdMapWithObject.values())
      .<BrandBuilder>cast(BrandBuilder.class)
      .map(BrandBuilder::build)
      .toList().blockingGet();
}

代码示例来源:origin: uk.os.vt/vt-mbtiles

@Override
public void putEntries(Observable<Entry> entries) {
 final String insert =
   "INSERT OR REPLACE INTO TILES(zoom_level, tile_column, tile_row, tile_data)"
     + " values (?, ?, ?, ?);";
 final Observable<Object> params = entries.concatMap(entry -> {
  byte[] compressedMvt;
  try {
   compressedMvt = CompressUtil.getCompressedAsGzip(entry.getVector());
  } catch (final IOException ex) {
   throw Exceptions.propagate(ex);
  }
  return Observable.<Object>just(entry.getZoomLevel(), entry.getColumn(),
    flipY(entry.getRow(), entry.getZoomLevel()), compressedMvt);
 })
   // source: https://github.com/davidmoten/rxjava-jdbc/pull/46/files
   .toList()
   .flattenAsObservable(objects -> objects);
 // TODO update when upstream is enhanced
 dataSource.update(insert)
   .parameterStream(params.toFlowable(BackpressureStrategy.BUFFER))
   .counts()
   .test() // TODO remove hack
   .awaitDone(5, TimeUnit.SECONDS)
   .assertComplete();
}

相关文章

微信公众号

最新文章

更多