本文整理了Java中cyclops.reactive.ReactiveSeq.mergeP
方法的一些代码示例,展示了ReactiveSeq.mergeP
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。ReactiveSeq.mergeP
方法的具体详情如下:
包路径:cyclops.reactive.ReactiveSeq
类名称:ReactiveSeq
方法名:mergeP
[英]A potentially asynchronous merge operation where data from each publisher may arrive out of order (if publishers are configured to publish asynchronously. The QueueFactory parameter can be used by pull based Streams to control the maximum queued elements @see QueueFactoriesPush based reactive-streams signal demand via their subscription.
[中]一种潜在的异步合并操作,其中来自每个发布服务器的数据可能会无序到达(如果发布服务器被配置为异步发布)。基于pull的流可以使用QueueFactory参数来控制最大排队元素@see QueueFactoresPush-based reactive Streams通过其订阅发送信号需求。
代码示例来源:origin: aol/cyclops
default ReactiveSeq<T> merge(final QueueFactory<T> factory,final Publisher<T>... publishers){
return mergeP(factory,publishers);
}
/**
代码示例来源:origin: aol/cyclops
default <R> ReactiveSeq<R> fanOut(Function<? super ReactiveSeq<T>, ? extends ReactiveSeq<? extends R>> path1,
Function<? super ReactiveSeq<T>, ? extends ReactiveSeq<? extends R>> path2){
Seq<ReactiveSeq<T>> list = multicast(2);
Publisher<R> pub = (Publisher<R>)path2.apply(list.getOrElse(1,empty()));
ReactiveSeq<R> seq = (ReactiveSeq<R>)path1.apply(list.getOrElse(0,empty()));
return seq.mergeP(pub);
}
代码示例来源:origin: aol/cyclops
default <R> ReactiveSeq<R> fanOut(Function<? super ReactiveSeq<T>, ? extends ReactiveSeq<? extends R>> path1,
Function<? super ReactiveSeq<T>, ? extends ReactiveSeq<? extends R>> path2,
Function<? super ReactiveSeq<T>, ? extends ReactiveSeq<? extends R>> path3){
Seq<ReactiveSeq<T>> list = multicast(3);
Publisher<R> pub2 = (Publisher<R>)path2.apply(list.getOrElse(1,empty()));
Publisher<R> pub3 = (Publisher<R>)path3.apply(list.getOrElse(2,empty()));
ReactiveSeq<R> seq = (ReactiveSeq<R>)path1.apply(list.getOrElse(0,empty()));
return seq.mergeP(pub2,pub3);
}
default <R> ReactiveSeq<R> parallelFanOut(ForkJoinPool fj,Function<? super Stream<T>, ? extends Stream<? extends R>> path1,
代码示例来源:origin: aol/cyclops
default <R> ReactiveSeq<R> fanOut(Function<? super ReactiveSeq<T>, ? extends ReactiveSeq<? extends R>> path1,
Function<? super ReactiveSeq<T>, ? extends ReactiveSeq<? extends R>> path2,
Function<? super ReactiveSeq<T>, ? extends ReactiveSeq<? extends R>> path3,
Function<? super ReactiveSeq<T>, ? extends ReactiveSeq<? extends R>> path4){
Seq<ReactiveSeq<T>> list = multicast(4);
Publisher<R> pub2 = (Publisher<R>)path2.apply(list.getOrElse(1,empty()));
Publisher<R> pub3 = (Publisher<R>)path3.apply(list.getOrElse(2,empty()));
Publisher<R> pub4 = (Publisher<R>)path4.apply(list.getOrElse(3,empty()));
ReactiveSeq<R> seq = (ReactiveSeq<R>)path1.apply(list.getOrElse(0,empty()));
return seq.mergeP(pub2,pub3,pub4);
}
default <R> ReactiveSeq<R> parallelFanOut(ForkJoinPool fj,Function<? super Stream<T>, ? extends Stream<? extends R>> path1,
代码示例来源:origin: aol/cyclops
@Override
public ReactiveSeq<Either<Tuple2<K1, V1>, Tuple2<K2, V2>>> stream() {
ReactiveSeq<Either<Tuple2<K1, V1>, Tuple2<K2, V2>>> x = map1.stream().map(LazyEither::left);
return x.mergeP(map2.stream().map(LazyEither::right));
}
@Override
代码示例来源:origin: aol/cyclops
default <R> ReactiveSeq<R> parallelFanOut(ForkJoinPool fj,Function<? super Stream<T>, ? extends Stream<? extends R>> path1,
Function<? super Stream<T>, ? extends Stream<? extends R>> path2){
Tuple2<ReactiveSeq<T>, ReactiveSeq<T>> d = duplicate(()->new ArrayDeque<T>(100));
Tuple2<? extends Stream<? extends R>, ? extends Stream<? extends R>> d2 = d.map1(path1).map2(path2);
ReactiveSeq<R> res1 = d._1().parallel(fj, path1);
ReactiveSeq<R> res2 = d._2().parallel(fj, path2);
return res1.mergeP(res2);
}
default <R> ReactiveSeq<R> fanOut(Function<? super ReactiveSeq<T>, ? extends ReactiveSeq<? extends R>> path1,
代码示例来源:origin: aol/cyclops
@Override
public ReactiveSeq<LazyEither3<Tuple2<K1, V1>, Tuple2<K2, V2>, Tuple2<K3, V3>>> stream() {
ReactiveSeq<LazyEither3<Tuple2<K1, V1>, Tuple2<K2, V2>, Tuple2<K3, V3>>> x = map1.stream().map(LazyEither3::left1);
return x.mergeP(map2.stream().map(LazyEither3::left2), map3.stream().map(LazyEither3::right));
}
@Override
代码示例来源:origin: aol/cyclops
@Test
public void mergePTest(){
for(int i=0;i<100;i++) {
System.out.println("*********************ITERATION " + i);
System.out.println("*********************ITERATION " + i);
System.out.println("*********************ITERATION " + i);
System.out.println("*********************ITERATION " + i);
System.out.println("*********************ITERATION " + i);
System.out.println("*********************ITERATION " + i + "********************");
List<Integer> list = of(3, 6, 9).mergeP(of(2, 4, 8), of(1, 5, 7)).toList();
assertThat("List is " + list,list, hasItems(1, 2, 3, 4, 5, 6, 7, 8, 9));
assertThat("List is " + list,list.size(), Matchers.equalTo(9));
}
}
@Test
代码示例来源:origin: aol/cyclops
default <R> ReactiveSeq<R> parallelFanOut(ForkJoinPool fj,Function<? super Stream<T>, ? extends Stream<? extends R>> path1,
Function<? super Stream<T>, ? extends Stream<? extends R>> path2,
Function<? super Stream<T>, ? extends Stream<? extends R>> path3,
Function<? super Stream<T>, ? extends Stream<? extends R>> path4){
val d = quadruplicate(()->new ArrayDeque<T>(100));
ReactiveSeq<R> res1 = d._1().parallel(fj, path1);
ReactiveSeq<R> res2 = d._2().parallel(fj, path2);
ReactiveSeq<R> res3 = d._3().parallel(fj, path3);
ReactiveSeq<R> res4 = d._4().parallel(fj, path4);
return res1.mergeP(res2,res3,res4);
}
代码示例来源:origin: aol/cyclops
default <R> ReactiveSeq<R> parallelFanOut(ForkJoinPool fj,Function<? super Stream<T>, ? extends Stream<? extends R>> path1,
Function<? super Stream<T>, ? extends Stream<? extends R>> path2,
Function<? super Stream<T>, ? extends Stream<? extends R>> path3){
Tuple3<ReactiveSeq<T>, ReactiveSeq<T>,ReactiveSeq<T>> d = triplicate(()->new ArrayDeque<T>(100));
val res = d.map1(path1).map2(path2).map3(path3);
ReactiveSeq<R> res1 = d._1().parallel(fj, path1);
ReactiveSeq<R> res2 = d._2().parallel(fj, path2);
ReactiveSeq<R> res3 = d._3().parallel(fj, path3);
return res1.mergeP(res2,res3);
}
default <R1,R2,R3,R4> ReactiveSeq<R4> parallelFanOutZipIn(ForkJoinPool fj,Function<? super Stream<T>, ? extends Stream<? extends R1>> path1,
代码示例来源:origin: aol/cyclops
@Override
public ReactiveSeq<Either<K1, K2>> streamKeys() {
ReactiveSeq<Either<K1, K2>> x = map1.stream().map(t->t._1()).map(LazyEither::left);
return x.mergeP(map2.stream().map(t->t._1()).map(LazyEither::right));
}
代码示例来源:origin: aol/cyclops
@Override
public ReactiveSeq<Either<V1, V2>> streamValues() {
ReactiveSeq<Either<V1, V2>> x = map1.stream().map(t->t._2()).map(LazyEither::left);
return x.mergeP(map2.stream().map(t->t._2()).map(LazyEither::right));
}
代码示例来源:origin: aol/cyclops
@Test
public void mergePTest() {
for (int i = 0; i < ITERATIONS; i++) {
List<Integer> list = of(3, 6, 9).mergeP(of(2, 4, 8), of(1, 5, 7)).toList();
assertThat(list, hasItems(1, 2, 3, 4, 5, 6, 7, 8, 9));
assertThat(list.size(), Matchers.equalTo(9));
}
}
代码示例来源:origin: aol/cyclops
@Test
public void duplicateTest(){
Tuple2<ReactiveSeq<Integer>, ReactiveSeq<Integer>> tp = Spouts.of(1, 2, 3, 4).duplicate();
// tp._1.printOut();
// tp._2.printOut();
System.out.println("Merge!");
// tp._1.mergeP(tp._2).printOut();
Spouts.of("a","b","c").mergeP(ReactiveSeq.of("bb","cc")).printOut();
}
@Test
代码示例来源:origin: aol/cyclops
@Test
public void mergePTest(){
for(int i=0;i<100;i++) {
List<Integer> list = of(3, 6, 9).mergeP(of(2, 4, 8), of(1, 5, 7)).toList();
assertThat("List is " + list,list, hasItems(1, 2, 3, 4, 5, 6, 7, 8, 9));
assertThat("List is " + list,list.size(), Matchers.equalTo(9));
}
}
@Test
代码示例来源:origin: aol/cyclops
@Test
public void mergePTest(){
//System.out.println(of(3, 6, 9).mergeP(of(2, 4, 8), of(1, 5, 7)).listX());
for(int i=0;i<ITERATIONS;i++) {
List<Integer> list = of(3, 6, 9).mergeP(of(2, 4, 8), of(1, 5, 7)).toList();
assertThat("List is " + list,list, hasItems(1, 2, 3, 4, 5, 6, 7, 8, 9));
assertThat("List is " + list,list.size(), Matchers.equalTo(9));
}
}
@Test
代码示例来源:origin: aol/cyclops
@Test
public void mergePTest(){
List<Integer> list = ReactiveSeq.of(3,6,9).mergeP(ReactiveSeq.of(2,4,8),ReactiveSeq.of(1,5,7)).toList();
assertThat(list,hasItems(1,2,3,4,5,6,7,8,9));
assertThat(list.size(),equalTo(9));
}
@Test
代码示例来源:origin: aol/cyclops
@Override
public ReactiveSeq<LazyEither3<V1, V2, V3>> streamValues() {
ReactiveSeq<LazyEither3<V1, V2, V3>> x = map1.stream().map(t->t._2()).map(LazyEither3::left1);
return x.mergeP(map2.stream().map(t->t._2()).map(LazyEither3::left2), map3.stream().map(t->t._2()).map(LazyEither3::right));
}
代码示例来源:origin: aol/cyclops
@Override
public ReactiveSeq<LazyEither3<K1, K2, K3>> streamKeys() {
ReactiveSeq<LazyEither3<K1, K2, K3>> x = map1.stream().map(t->t._1()).map(LazyEither3::left1);
return x.mergeP(map2.stream().map(t->t._1()).map(LazyEither3::left2), map3.stream().map(t->t._1()).map(LazyEither3::right));
}
@Override
代码示例来源:origin: com.oath.cyclops/cyclops
@Override
public ReactiveSeq<Either<Tuple2<K1, V1>, Tuple2<K2, V2>>> stream() {
ReactiveSeq<Either<Tuple2<K1, V1>, Tuple2<K2, V2>>> x = map1.stream().map(LazyEither::left);
return x.mergeP(map2.stream().map(LazyEither::right));
}
@Override
内容来源于网络,如有侵权,请联系作者删除!