cyclops.reactive.ReactiveSeq.parallel()方法的使用及代码示例

x33g5p2x  于2022-01-29 转载在 其他  
字(10.6k)|赞(0)|评价(0)|浏览(99)

本文整理了Java中cyclops.reactive.ReactiveSeq.parallel方法的一些代码示例,展示了ReactiveSeq.parallel的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。ReactiveSeq.parallel方法的具体详情如下:
包路径:cyclops.reactive.ReactiveSeq
类名称:ReactiveSeq
方法名:parallel

ReactiveSeq.parallel介绍

暂无

代码示例

代码示例来源:origin: aol/cyclops

default <R1,R2,R3,R4,R5> ReactiveSeq<R5> parallelFanOutZipIn(ForkJoinPool fj,Function<? super Stream<T>, ? extends Stream<? extends R1>> path1,
                           Function<? super Stream<T>, ? extends Stream<? extends R2>> path2,
                           Function<? super Stream<T>, ? extends Stream<? extends R3>> path3,
                           Function<? super Stream<T>, ? extends Stream<? extends R4>> path4,
                           Function4<? super R1, ? super R2, ? super R3, ? super R4, ? extends R5> zipFn){
  val d = quadruplicate(()->new ArrayDeque<T>(100));
  ReactiveSeq<R1> res1 = d._1().parallel(fj, path1);
  ReactiveSeq<R2> res2 = d._2().parallel(fj, path2);
  ReactiveSeq<R3> res3 = d._3().parallel(fj, path3);
  ReactiveSeq<R4> res4 = d._4().parallel(fj, path4);
  return res1.zip4(res2,res3,res4,zipFn);
}

代码示例来源:origin: aol/cyclops

default <R> ReactiveSeq<R> parallelFanOut(ForkJoinPool fj,Function<? super Stream<T>, ? extends Stream<? extends R>> path1,
                 Function<? super Stream<T>, ? extends Stream<? extends R>> path2,
                 Function<? super Stream<T>, ? extends Stream<? extends R>> path3,
                 Function<? super Stream<T>, ? extends Stream<? extends R>> path4){
  val d = quadruplicate(()->new ArrayDeque<T>(100));
  ReactiveSeq<R> res1 = d._1().parallel(fj, path1);
  ReactiveSeq<R> res2 = d._2().parallel(fj, path2);
  ReactiveSeq<R> res3 = d._3().parallel(fj, path3);
  ReactiveSeq<R> res4 = d._4().parallel(fj, path4);
  return res1.mergeP(res2,res3,res4);
}

代码示例来源:origin: aol/cyclops

default <R1,R2,R3,R4> ReactiveSeq<R4> parallelFanOutZipIn(ForkJoinPool fj,Function<? super Stream<T>, ? extends Stream<? extends R1>> path1,
                         Function<? super Stream<T>, ? extends Stream<? extends R2>> path2,
                         Function<? super Stream<T>, ? extends Stream<? extends R3>> path3,
                         Function3<? super R1, ? super R2, ? super R3, ? extends R4> zipFn){
  Tuple3<ReactiveSeq<T>, ReactiveSeq<T>,ReactiveSeq<T>> d = triplicate(()->new ArrayDeque<T>(100));
  ReactiveSeq<R1> res1 = d._1().parallel(fj, path1);
  ReactiveSeq<R2> res2 = d._2().parallel(fj, path2);
  ReactiveSeq<R3> res3 = d._3().parallel(fj, path3);
  return res1.zip3(res2,res3,zipFn);
}
default <R1,R2,R3,R4> ReactiveSeq<R4> fanOutZipIn(Function<? super ReactiveSeq<T>, ? extends ReactiveSeq<? extends R1>> path1,

代码示例来源:origin: aol/cyclops

default <R> ReactiveSeq<R> parallelFanOut(ForkJoinPool fj,Function<? super Stream<T>, ? extends Stream<? extends R>> path1,
                     Function<? super Stream<T>, ? extends Stream<? extends R>> path2,
                     Function<? super Stream<T>, ? extends Stream<? extends R>> path3){
  Tuple3<ReactiveSeq<T>, ReactiveSeq<T>,ReactiveSeq<T>> d = triplicate(()->new ArrayDeque<T>(100));
  val res = d.map1(path1).map2(path2).map3(path3);
  ReactiveSeq<R> res1 = d._1().parallel(fj, path1);
  ReactiveSeq<R> res2 = d._2().parallel(fj, path2);
  ReactiveSeq<R> res3 = d._3().parallel(fj, path3);
  return res1.mergeP(res2,res3);
}
default <R1,R2,R3,R4> ReactiveSeq<R4> parallelFanOutZipIn(ForkJoinPool fj,Function<? super Stream<T>, ? extends Stream<? extends R1>> path1,

代码示例来源:origin: aol/cyclops

default <R1,R2,R3> ReactiveSeq<R3> parallelFanOutZipIn(ForkJoinPool fj, Function<? super Stream<T>, ? extends Stream<? extends R1>> path1,
                        Function<? super Stream<T>, ? extends Stream<? extends R2>> path2,
                        BiFunction<? super R1, ? super R2, ? extends R3> zipFn){
  Tuple2<ReactiveSeq<T>, ReactiveSeq<T>> d = duplicate(()->new ArrayDeque<T>(100));
  Tuple2<? extends Stream<? extends R1>, ? extends Stream<? extends R2>> d2 = d.map1(path1).map2(path2);
  ReactiveSeq<R1> res1 = d._1().parallel(fj, path1);
  ReactiveSeq<R2> res2 = d._2().parallel(fj, path2);
  return res1.zip(res2,zipFn);
}
default <R> ReactiveSeq<R> fanOut(Function<? super ReactiveSeq<T>, ? extends ReactiveSeq<? extends R>> path1,

代码示例来源:origin: aol/cyclops

default <R> ReactiveSeq<R> parallelFanOut(ForkJoinPool fj,Function<? super Stream<T>, ? extends Stream<? extends R>> path1,
                 Function<? super Stream<T>, ? extends Stream<? extends R>> path2){
  Tuple2<ReactiveSeq<T>, ReactiveSeq<T>> d = duplicate(()->new ArrayDeque<T>(100));
  Tuple2<? extends Stream<? extends R>, ? extends Stream<? extends R>> d2 = d.map1(path1).map2(path2);
  ReactiveSeq<R> res1 = d._1().parallel(fj, path1);
  ReactiveSeq<R> res2 = d._2().parallel(fj, path2);
  return res1.mergeP(res2);
}
default <R> ReactiveSeq<R> fanOut(Function<? super ReactiveSeq<T>, ? extends ReactiveSeq<? extends R>> path1,

代码示例来源:origin: aol/cyclops

@Test
public void testParallel(){
  for(int k=0;k<1000;k++) {
    System.out.println("Iteration " + k);
    assertThat(ReactiveSeq.range(0, 1000)
        .parallel(s -> s.map(i -> i * 2))
        .count(), equalTo(1000L));
  }
}
@Test

代码示例来源:origin: aol/cyclops

@Test
public void testParallel2(){
  for(int k=0;k<1000;k++) {
    System.out.println("Iteration " + k);
    assertThat(ReactiveSeq.range(0, 1000)
        .parallel(new ForkJoinPool(10),s -> s.map(i -> i * 2))
        .count(), equalTo(1000L));
  }
}

代码示例来源:origin: com.oath.cyclops/cyclops-futurestream

@Override
default <R> FutureStream<R> parallel(Function<? super Stream<U>,? extends Stream<? extends R>> fn){
  return fromStream(ReactiveSeq.super.parallel(fn));
}

代码示例来源:origin: com.oath.cyclops/cyclops

default <R1,R2,R3,R4,R5> ReactiveSeq<R5> parallelFanOutZipIn(ForkJoinPool fj,Function<? super Stream<T>, ? extends Stream<? extends R1>> path1,
                           Function<? super Stream<T>, ? extends Stream<? extends R2>> path2,
                           Function<? super Stream<T>, ? extends Stream<? extends R3>> path3,
                           Function<? super Stream<T>, ? extends Stream<? extends R4>> path4,
                           Function4<? super R1, ? super R2, ? super R3, ? super R4, ? extends R5> zipFn){
  val d = quadruplicate(()->new ArrayDeque<T>(100));
  ReactiveSeq<R1> res1 = d._1().parallel(fj, path1);
  ReactiveSeq<R2> res2 = d._2().parallel(fj, path2);
  ReactiveSeq<R3> res3 = d._3().parallel(fj, path3);
  ReactiveSeq<R4> res4 = d._4().parallel(fj, path4);
  return res1.zip4(res2,res3,res4,zipFn);
}

代码示例来源:origin: com.oath.cyclops/cyclops

default <R1,R2,R3,R4> ReactiveSeq<R4> parallelFanOutZipIn(ForkJoinPool fj,Function<? super Stream<T>, ? extends Stream<? extends R1>> path1,
                         Function<? super Stream<T>, ? extends Stream<? extends R2>> path2,
                         Function<? super Stream<T>, ? extends Stream<? extends R3>> path3,
                         Function3<? super R1, ? super R2, ? super R3, ? extends R4> zipFn){
  Tuple3<ReactiveSeq<T>, ReactiveSeq<T>,ReactiveSeq<T>> d = triplicate(()->new ArrayDeque<T>(100));
  ReactiveSeq<R1> res1 = d._1().parallel(fj, path1);
  ReactiveSeq<R2> res2 = d._2().parallel(fj, path2);
  ReactiveSeq<R3> res3 = d._3().parallel(fj, path3);
  return res1.zip3(res2,res3,zipFn);
}
default <R1,R2,R3,R4> ReactiveSeq<R4> fanOutZipIn(Function<? super ReactiveSeq<T>, ? extends ReactiveSeq<? extends R1>> path1,

代码示例来源:origin: com.oath.cyclops/cyclops

default <R> ReactiveSeq<R> parallelFanOut(ForkJoinPool fj,Function<? super Stream<T>, ? extends Stream<? extends R>> path1,
                 Function<? super Stream<T>, ? extends Stream<? extends R>> path2,
                 Function<? super Stream<T>, ? extends Stream<? extends R>> path3,
                 Function<? super Stream<T>, ? extends Stream<? extends R>> path4){
  val d = quadruplicate(()->new ArrayDeque<T>(100));
  ReactiveSeq<R> res1 = d._1().parallel(fj, path1);
  ReactiveSeq<R> res2 = d._2().parallel(fj, path2);
  ReactiveSeq<R> res3 = d._3().parallel(fj, path3);
  ReactiveSeq<R> res4 = d._4().parallel(fj, path4);
  return res1.mergeP(res2,res3,res4);
}

代码示例来源:origin: com.oath.cyclops/cyclops-futurestream

@Override
default <R> FutureStream<R> parallel(ForkJoinPool fj, Function<? super Stream<U>, ? extends Stream<? extends R>> fn) {
  return fromStream(stream().parallel(fj,fn));
}
@Override

代码示例来源:origin: com.oath.cyclops/cyclops

default <R> ReactiveSeq<R> parallelFanOut(ForkJoinPool fj,Function<? super Stream<T>, ? extends Stream<? extends R>> path1,
                     Function<? super Stream<T>, ? extends Stream<? extends R>> path2,
                     Function<? super Stream<T>, ? extends Stream<? extends R>> path3){
  Tuple3<ReactiveSeq<T>, ReactiveSeq<T>,ReactiveSeq<T>> d = triplicate(()->new ArrayDeque<T>(100));
  val res = d.map1(path1).map2(path2).map3(path3);
  ReactiveSeq<R> res1 = d._1().parallel(fj, path1);
  ReactiveSeq<R> res2 = d._2().parallel(fj, path2);
  ReactiveSeq<R> res3 = d._3().parallel(fj, path3);
  return res1.mergeP(res2,res3);
}
default <R1,R2,R3,R4> ReactiveSeq<R4> parallelFanOutZipIn(ForkJoinPool fj,Function<? super Stream<T>, ? extends Stream<? extends R1>> path1,

代码示例来源:origin: com.oath.cyclops/cyclops

default <R1,R2,R3> ReactiveSeq<R3> parallelFanOutZipIn(ForkJoinPool fj, Function<? super Stream<T>, ? extends Stream<? extends R1>> path1,
                        Function<? super Stream<T>, ? extends Stream<? extends R2>> path2,
                        BiFunction<? super R1, ? super R2, ? extends R3> zipFn){
  Tuple2<ReactiveSeq<T>, ReactiveSeq<T>> d = duplicate(()->new ArrayDeque<T>(100));
  Tuple2<? extends Stream<? extends R1>, ? extends Stream<? extends R2>> d2 = d.map1(path1).map2(path2);
  ReactiveSeq<R1> res1 = d._1().parallel(fj, path1);
  ReactiveSeq<R2> res2 = d._2().parallel(fj, path2);
  return res1.zip(res2,zipFn);
}
default <R> ReactiveSeq<R> fanOut(Function<? super ReactiveSeq<T>, ? extends ReactiveSeq<? extends R>> path1,

代码示例来源:origin: com.oath.cyclops/cyclops

default <R> ReactiveSeq<R> parallelFanOut(ForkJoinPool fj,Function<? super Stream<T>, ? extends Stream<? extends R>> path1,
                 Function<? super Stream<T>, ? extends Stream<? extends R>> path2){
  Tuple2<ReactiveSeq<T>, ReactiveSeq<T>> d = duplicate(()->new ArrayDeque<T>(100));
  Tuple2<? extends Stream<? extends R>, ? extends Stream<? extends R>> d2 = d.map1(path1).map2(path2);
  ReactiveSeq<R> res1 = d._1().parallel(fj, path1);
  ReactiveSeq<R> res2 = d._2().parallel(fj, path2);
  return res1.mergeP(res2);
}
default <R> ReactiveSeq<R> fanOut(Function<? super ReactiveSeq<T>, ? extends ReactiveSeq<? extends R>> path1,

相关文章

微信公众号

最新文章

更多

ReactiveSeq类方法