本文整理了Java中cyclops.reactive.ReactiveSeq.parallel
方法的一些代码示例,展示了ReactiveSeq.parallel
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。ReactiveSeq.parallel
方法的具体详情如下:
包路径:cyclops.reactive.ReactiveSeq
类名称:ReactiveSeq
方法名:parallel
暂无
代码示例来源:origin: aol/cyclops
default <R1,R2,R3,R4,R5> ReactiveSeq<R5> parallelFanOutZipIn(ForkJoinPool fj,Function<? super Stream<T>, ? extends Stream<? extends R1>> path1,
Function<? super Stream<T>, ? extends Stream<? extends R2>> path2,
Function<? super Stream<T>, ? extends Stream<? extends R3>> path3,
Function<? super Stream<T>, ? extends Stream<? extends R4>> path4,
Function4<? super R1, ? super R2, ? super R3, ? super R4, ? extends R5> zipFn){
val d = quadruplicate(()->new ArrayDeque<T>(100));
ReactiveSeq<R1> res1 = d._1().parallel(fj, path1);
ReactiveSeq<R2> res2 = d._2().parallel(fj, path2);
ReactiveSeq<R3> res3 = d._3().parallel(fj, path3);
ReactiveSeq<R4> res4 = d._4().parallel(fj, path4);
return res1.zip4(res2,res3,res4,zipFn);
}
代码示例来源:origin: aol/cyclops
default <R> ReactiveSeq<R> parallelFanOut(ForkJoinPool fj,Function<? super Stream<T>, ? extends Stream<? extends R>> path1,
Function<? super Stream<T>, ? extends Stream<? extends R>> path2,
Function<? super Stream<T>, ? extends Stream<? extends R>> path3,
Function<? super Stream<T>, ? extends Stream<? extends R>> path4){
val d = quadruplicate(()->new ArrayDeque<T>(100));
ReactiveSeq<R> res1 = d._1().parallel(fj, path1);
ReactiveSeq<R> res2 = d._2().parallel(fj, path2);
ReactiveSeq<R> res3 = d._3().parallel(fj, path3);
ReactiveSeq<R> res4 = d._4().parallel(fj, path4);
return res1.mergeP(res2,res3,res4);
}
代码示例来源:origin: aol/cyclops
default <R1,R2,R3,R4> ReactiveSeq<R4> parallelFanOutZipIn(ForkJoinPool fj,Function<? super Stream<T>, ? extends Stream<? extends R1>> path1,
Function<? super Stream<T>, ? extends Stream<? extends R2>> path2,
Function<? super Stream<T>, ? extends Stream<? extends R3>> path3,
Function3<? super R1, ? super R2, ? super R3, ? extends R4> zipFn){
Tuple3<ReactiveSeq<T>, ReactiveSeq<T>,ReactiveSeq<T>> d = triplicate(()->new ArrayDeque<T>(100));
ReactiveSeq<R1> res1 = d._1().parallel(fj, path1);
ReactiveSeq<R2> res2 = d._2().parallel(fj, path2);
ReactiveSeq<R3> res3 = d._3().parallel(fj, path3);
return res1.zip3(res2,res3,zipFn);
}
default <R1,R2,R3,R4> ReactiveSeq<R4> fanOutZipIn(Function<? super ReactiveSeq<T>, ? extends ReactiveSeq<? extends R1>> path1,
代码示例来源:origin: aol/cyclops
default <R> ReactiveSeq<R> parallelFanOut(ForkJoinPool fj,Function<? super Stream<T>, ? extends Stream<? extends R>> path1,
Function<? super Stream<T>, ? extends Stream<? extends R>> path2,
Function<? super Stream<T>, ? extends Stream<? extends R>> path3){
Tuple3<ReactiveSeq<T>, ReactiveSeq<T>,ReactiveSeq<T>> d = triplicate(()->new ArrayDeque<T>(100));
val res = d.map1(path1).map2(path2).map3(path3);
ReactiveSeq<R> res1 = d._1().parallel(fj, path1);
ReactiveSeq<R> res2 = d._2().parallel(fj, path2);
ReactiveSeq<R> res3 = d._3().parallel(fj, path3);
return res1.mergeP(res2,res3);
}
default <R1,R2,R3,R4> ReactiveSeq<R4> parallelFanOutZipIn(ForkJoinPool fj,Function<? super Stream<T>, ? extends Stream<? extends R1>> path1,
代码示例来源:origin: aol/cyclops
default <R1,R2,R3> ReactiveSeq<R3> parallelFanOutZipIn(ForkJoinPool fj, Function<? super Stream<T>, ? extends Stream<? extends R1>> path1,
Function<? super Stream<T>, ? extends Stream<? extends R2>> path2,
BiFunction<? super R1, ? super R2, ? extends R3> zipFn){
Tuple2<ReactiveSeq<T>, ReactiveSeq<T>> d = duplicate(()->new ArrayDeque<T>(100));
Tuple2<? extends Stream<? extends R1>, ? extends Stream<? extends R2>> d2 = d.map1(path1).map2(path2);
ReactiveSeq<R1> res1 = d._1().parallel(fj, path1);
ReactiveSeq<R2> res2 = d._2().parallel(fj, path2);
return res1.zip(res2,zipFn);
}
default <R> ReactiveSeq<R> fanOut(Function<? super ReactiveSeq<T>, ? extends ReactiveSeq<? extends R>> path1,
代码示例来源:origin: aol/cyclops
default <R> ReactiveSeq<R> parallelFanOut(ForkJoinPool fj,Function<? super Stream<T>, ? extends Stream<? extends R>> path1,
Function<? super Stream<T>, ? extends Stream<? extends R>> path2){
Tuple2<ReactiveSeq<T>, ReactiveSeq<T>> d = duplicate(()->new ArrayDeque<T>(100));
Tuple2<? extends Stream<? extends R>, ? extends Stream<? extends R>> d2 = d.map1(path1).map2(path2);
ReactiveSeq<R> res1 = d._1().parallel(fj, path1);
ReactiveSeq<R> res2 = d._2().parallel(fj, path2);
return res1.mergeP(res2);
}
default <R> ReactiveSeq<R> fanOut(Function<? super ReactiveSeq<T>, ? extends ReactiveSeq<? extends R>> path1,
代码示例来源:origin: aol/cyclops
@Test
public void testParallel(){
for(int k=0;k<1000;k++) {
System.out.println("Iteration " + k);
assertThat(ReactiveSeq.range(0, 1000)
.parallel(s -> s.map(i -> i * 2))
.count(), equalTo(1000L));
}
}
@Test
代码示例来源:origin: aol/cyclops
@Test
public void testParallel2(){
for(int k=0;k<1000;k++) {
System.out.println("Iteration " + k);
assertThat(ReactiveSeq.range(0, 1000)
.parallel(new ForkJoinPool(10),s -> s.map(i -> i * 2))
.count(), equalTo(1000L));
}
}
代码示例来源:origin: com.oath.cyclops/cyclops-futurestream
@Override
default <R> FutureStream<R> parallel(Function<? super Stream<U>,? extends Stream<? extends R>> fn){
return fromStream(ReactiveSeq.super.parallel(fn));
}
代码示例来源:origin: com.oath.cyclops/cyclops
default <R1,R2,R3,R4,R5> ReactiveSeq<R5> parallelFanOutZipIn(ForkJoinPool fj,Function<? super Stream<T>, ? extends Stream<? extends R1>> path1,
Function<? super Stream<T>, ? extends Stream<? extends R2>> path2,
Function<? super Stream<T>, ? extends Stream<? extends R3>> path3,
Function<? super Stream<T>, ? extends Stream<? extends R4>> path4,
Function4<? super R1, ? super R2, ? super R3, ? super R4, ? extends R5> zipFn){
val d = quadruplicate(()->new ArrayDeque<T>(100));
ReactiveSeq<R1> res1 = d._1().parallel(fj, path1);
ReactiveSeq<R2> res2 = d._2().parallel(fj, path2);
ReactiveSeq<R3> res3 = d._3().parallel(fj, path3);
ReactiveSeq<R4> res4 = d._4().parallel(fj, path4);
return res1.zip4(res2,res3,res4,zipFn);
}
代码示例来源:origin: com.oath.cyclops/cyclops
default <R1,R2,R3,R4> ReactiveSeq<R4> parallelFanOutZipIn(ForkJoinPool fj,Function<? super Stream<T>, ? extends Stream<? extends R1>> path1,
Function<? super Stream<T>, ? extends Stream<? extends R2>> path2,
Function<? super Stream<T>, ? extends Stream<? extends R3>> path3,
Function3<? super R1, ? super R2, ? super R3, ? extends R4> zipFn){
Tuple3<ReactiveSeq<T>, ReactiveSeq<T>,ReactiveSeq<T>> d = triplicate(()->new ArrayDeque<T>(100));
ReactiveSeq<R1> res1 = d._1().parallel(fj, path1);
ReactiveSeq<R2> res2 = d._2().parallel(fj, path2);
ReactiveSeq<R3> res3 = d._3().parallel(fj, path3);
return res1.zip3(res2,res3,zipFn);
}
default <R1,R2,R3,R4> ReactiveSeq<R4> fanOutZipIn(Function<? super ReactiveSeq<T>, ? extends ReactiveSeq<? extends R1>> path1,
代码示例来源:origin: com.oath.cyclops/cyclops
default <R> ReactiveSeq<R> parallelFanOut(ForkJoinPool fj,Function<? super Stream<T>, ? extends Stream<? extends R>> path1,
Function<? super Stream<T>, ? extends Stream<? extends R>> path2,
Function<? super Stream<T>, ? extends Stream<? extends R>> path3,
Function<? super Stream<T>, ? extends Stream<? extends R>> path4){
val d = quadruplicate(()->new ArrayDeque<T>(100));
ReactiveSeq<R> res1 = d._1().parallel(fj, path1);
ReactiveSeq<R> res2 = d._2().parallel(fj, path2);
ReactiveSeq<R> res3 = d._3().parallel(fj, path3);
ReactiveSeq<R> res4 = d._4().parallel(fj, path4);
return res1.mergeP(res2,res3,res4);
}
代码示例来源:origin: com.oath.cyclops/cyclops-futurestream
@Override
default <R> FutureStream<R> parallel(ForkJoinPool fj, Function<? super Stream<U>, ? extends Stream<? extends R>> fn) {
return fromStream(stream().parallel(fj,fn));
}
@Override
代码示例来源:origin: com.oath.cyclops/cyclops
default <R> ReactiveSeq<R> parallelFanOut(ForkJoinPool fj,Function<? super Stream<T>, ? extends Stream<? extends R>> path1,
Function<? super Stream<T>, ? extends Stream<? extends R>> path2,
Function<? super Stream<T>, ? extends Stream<? extends R>> path3){
Tuple3<ReactiveSeq<T>, ReactiveSeq<T>,ReactiveSeq<T>> d = triplicate(()->new ArrayDeque<T>(100));
val res = d.map1(path1).map2(path2).map3(path3);
ReactiveSeq<R> res1 = d._1().parallel(fj, path1);
ReactiveSeq<R> res2 = d._2().parallel(fj, path2);
ReactiveSeq<R> res3 = d._3().parallel(fj, path3);
return res1.mergeP(res2,res3);
}
default <R1,R2,R3,R4> ReactiveSeq<R4> parallelFanOutZipIn(ForkJoinPool fj,Function<? super Stream<T>, ? extends Stream<? extends R1>> path1,
代码示例来源:origin: com.oath.cyclops/cyclops
default <R1,R2,R3> ReactiveSeq<R3> parallelFanOutZipIn(ForkJoinPool fj, Function<? super Stream<T>, ? extends Stream<? extends R1>> path1,
Function<? super Stream<T>, ? extends Stream<? extends R2>> path2,
BiFunction<? super R1, ? super R2, ? extends R3> zipFn){
Tuple2<ReactiveSeq<T>, ReactiveSeq<T>> d = duplicate(()->new ArrayDeque<T>(100));
Tuple2<? extends Stream<? extends R1>, ? extends Stream<? extends R2>> d2 = d.map1(path1).map2(path2);
ReactiveSeq<R1> res1 = d._1().parallel(fj, path1);
ReactiveSeq<R2> res2 = d._2().parallel(fj, path2);
return res1.zip(res2,zipFn);
}
default <R> ReactiveSeq<R> fanOut(Function<? super ReactiveSeq<T>, ? extends ReactiveSeq<? extends R>> path1,
代码示例来源:origin: com.oath.cyclops/cyclops
default <R> ReactiveSeq<R> parallelFanOut(ForkJoinPool fj,Function<? super Stream<T>, ? extends Stream<? extends R>> path1,
Function<? super Stream<T>, ? extends Stream<? extends R>> path2){
Tuple2<ReactiveSeq<T>, ReactiveSeq<T>> d = duplicate(()->new ArrayDeque<T>(100));
Tuple2<? extends Stream<? extends R>, ? extends Stream<? extends R>> d2 = d.map1(path1).map2(path2);
ReactiveSeq<R> res1 = d._1().parallel(fj, path1);
ReactiveSeq<R> res2 = d._2().parallel(fj, path2);
return res1.mergeP(res2);
}
default <R> ReactiveSeq<R> fanOut(Function<? super ReactiveSeq<T>, ? extends ReactiveSeq<? extends R>> path1,
内容来源于网络,如有侵权,请联系作者删除!