cyclops.reactive.ReactiveSeq.mergeP()方法的使用及代码示例

x33g5p2x  于2022-01-29 转载在 其他  
字(9.8k)|赞(0)|评价(0)|浏览(97)

本文整理了Java中cyclops.reactive.ReactiveSeq.mergeP方法的一些代码示例,展示了ReactiveSeq.mergeP的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。ReactiveSeq.mergeP方法的具体详情如下:
包路径:cyclops.reactive.ReactiveSeq
类名称:ReactiveSeq
方法名:mergeP

ReactiveSeq.mergeP介绍

[英]A potentially asynchronous merge operation where data from each publisher may arrive out of order (if publishers are configured to publish asynchronously. The QueueFactory parameter can be used by pull based Streams to control the maximum queued elements @see QueueFactoriesPush based reactive-streams signal demand via their subscription.
[中]一种潜在的异步合并操作,其中来自每个发布服务器的数据可能会无序到达(如果发布服务器被配置为异步发布)。基于pull的流可以使用QueueFactory参数来控制最大排队元素@see QueueFactoresPush-based reactive Streams通过其订阅发送信号需求。

代码示例

代码示例来源:origin: aol/cyclops

default ReactiveSeq<T> merge(final QueueFactory<T> factory,final Publisher<T>... publishers){
  return mergeP(factory,publishers);
}
/**

代码示例来源:origin: aol/cyclops

default <R> ReactiveSeq<R> fanOut(Function<? super ReactiveSeq<T>, ? extends ReactiveSeq<? extends R>> path1,
                 Function<? super ReactiveSeq<T>, ? extends ReactiveSeq<? extends R>> path2){
  Seq<ReactiveSeq<T>> list = multicast(2);
  Publisher<R> pub = (Publisher<R>)path2.apply(list.getOrElse(1,empty()));
  ReactiveSeq<R> seq = (ReactiveSeq<R>)path1.apply(list.getOrElse(0,empty()));
  return  seq.mergeP(pub);
}

代码示例来源:origin: aol/cyclops

default <R> ReactiveSeq<R> fanOut(Function<? super ReactiveSeq<T>, ? extends ReactiveSeq<? extends R>> path1,
                 Function<? super ReactiveSeq<T>, ? extends ReactiveSeq<? extends R>> path2,
                 Function<? super ReactiveSeq<T>, ? extends ReactiveSeq<? extends R>> path3){
  Seq<ReactiveSeq<T>> list = multicast(3);
  Publisher<R> pub2 = (Publisher<R>)path2.apply(list.getOrElse(1,empty()));
  Publisher<R> pub3 = (Publisher<R>)path3.apply(list.getOrElse(2,empty()));
  ReactiveSeq<R> seq = (ReactiveSeq<R>)path1.apply(list.getOrElse(0,empty()));
  return  seq.mergeP(pub2,pub3);
}
default <R> ReactiveSeq<R> parallelFanOut(ForkJoinPool fj,Function<? super Stream<T>, ? extends Stream<? extends R>> path1,

代码示例来源:origin: aol/cyclops

default <R> ReactiveSeq<R> fanOut(Function<? super ReactiveSeq<T>, ? extends ReactiveSeq<? extends R>> path1,
                 Function<? super ReactiveSeq<T>, ? extends ReactiveSeq<? extends R>> path2,
                 Function<? super ReactiveSeq<T>, ? extends ReactiveSeq<? extends R>> path3,
                 Function<? super ReactiveSeq<T>, ? extends ReactiveSeq<? extends R>> path4){
  Seq<ReactiveSeq<T>> list = multicast(4);
  Publisher<R> pub2 = (Publisher<R>)path2.apply(list.getOrElse(1,empty()));
  Publisher<R> pub3 = (Publisher<R>)path3.apply(list.getOrElse(2,empty()));
  Publisher<R> pub4 = (Publisher<R>)path4.apply(list.getOrElse(3,empty()));
  ReactiveSeq<R> seq = (ReactiveSeq<R>)path1.apply(list.getOrElse(0,empty()));
  return  seq.mergeP(pub2,pub3,pub4);
}
default <R> ReactiveSeq<R> parallelFanOut(ForkJoinPool fj,Function<? super Stream<T>, ? extends Stream<? extends R>> path1,

代码示例来源:origin: aol/cyclops

@Override
public ReactiveSeq<Either<Tuple2<K1, V1>, Tuple2<K2, V2>>> stream() {
  ReactiveSeq<Either<Tuple2<K1, V1>, Tuple2<K2, V2>>> x = map1.stream().map(LazyEither::left);
  return x.mergeP(map2.stream().map(LazyEither::right));
}
@Override

代码示例来源:origin: aol/cyclops

default <R> ReactiveSeq<R> parallelFanOut(ForkJoinPool fj,Function<? super Stream<T>, ? extends Stream<? extends R>> path1,
                 Function<? super Stream<T>, ? extends Stream<? extends R>> path2){
  Tuple2<ReactiveSeq<T>, ReactiveSeq<T>> d = duplicate(()->new ArrayDeque<T>(100));
  Tuple2<? extends Stream<? extends R>, ? extends Stream<? extends R>> d2 = d.map1(path1).map2(path2);
  ReactiveSeq<R> res1 = d._1().parallel(fj, path1);
  ReactiveSeq<R> res2 = d._2().parallel(fj, path2);
  return res1.mergeP(res2);
}
default <R> ReactiveSeq<R> fanOut(Function<? super ReactiveSeq<T>, ? extends ReactiveSeq<? extends R>> path1,

代码示例来源:origin: aol/cyclops

@Override
public ReactiveSeq<LazyEither3<Tuple2<K1, V1>, Tuple2<K2, V2>, Tuple2<K3, V3>>> stream() {
  ReactiveSeq<LazyEither3<Tuple2<K1, V1>, Tuple2<K2, V2>, Tuple2<K3, V3>>> x = map1.stream().map(LazyEither3::left1);
  return x.mergeP(map2.stream().map(LazyEither3::left2), map3.stream().map(LazyEither3::right));
}
@Override

代码示例来源:origin: aol/cyclops

@Test
public void mergePTest(){
  for(int i=0;i<100;i++) {
    System.out.println("*********************ITERATION " + i);
    System.out.println("*********************ITERATION " + i);
    System.out.println("*********************ITERATION " + i);
    System.out.println("*********************ITERATION " + i);
    System.out.println("*********************ITERATION " + i);
    System.out.println("*********************ITERATION " + i + "********************");
    List<Integer> list = of(3, 6, 9).mergeP(of(2, 4, 8), of(1, 5, 7)).toList();
    assertThat("List is " + list,list, hasItems(1, 2, 3, 4, 5, 6, 7, 8, 9));
    assertThat("List is " + list,list.size(), Matchers.equalTo(9));
  }
}
@Test

代码示例来源:origin: aol/cyclops

default <R> ReactiveSeq<R> parallelFanOut(ForkJoinPool fj,Function<? super Stream<T>, ? extends Stream<? extends R>> path1,
                 Function<? super Stream<T>, ? extends Stream<? extends R>> path2,
                 Function<? super Stream<T>, ? extends Stream<? extends R>> path3,
                 Function<? super Stream<T>, ? extends Stream<? extends R>> path4){
  val d = quadruplicate(()->new ArrayDeque<T>(100));
  ReactiveSeq<R> res1 = d._1().parallel(fj, path1);
  ReactiveSeq<R> res2 = d._2().parallel(fj, path2);
  ReactiveSeq<R> res3 = d._3().parallel(fj, path3);
  ReactiveSeq<R> res4 = d._4().parallel(fj, path4);
  return res1.mergeP(res2,res3,res4);
}

代码示例来源:origin: aol/cyclops

default <R> ReactiveSeq<R> parallelFanOut(ForkJoinPool fj,Function<? super Stream<T>, ? extends Stream<? extends R>> path1,
                     Function<? super Stream<T>, ? extends Stream<? extends R>> path2,
                     Function<? super Stream<T>, ? extends Stream<? extends R>> path3){
  Tuple3<ReactiveSeq<T>, ReactiveSeq<T>,ReactiveSeq<T>> d = triplicate(()->new ArrayDeque<T>(100));
  val res = d.map1(path1).map2(path2).map3(path3);
  ReactiveSeq<R> res1 = d._1().parallel(fj, path1);
  ReactiveSeq<R> res2 = d._2().parallel(fj, path2);
  ReactiveSeq<R> res3 = d._3().parallel(fj, path3);
  return res1.mergeP(res2,res3);
}
default <R1,R2,R3,R4> ReactiveSeq<R4> parallelFanOutZipIn(ForkJoinPool fj,Function<? super Stream<T>, ? extends Stream<? extends R1>> path1,

代码示例来源:origin: aol/cyclops

@Override
public ReactiveSeq<Either<K1, K2>> streamKeys() {
  ReactiveSeq<Either<K1, K2>> x = map1.stream().map(t->t._1()).map(LazyEither::left);
  return x.mergeP(map2.stream().map(t->t._1()).map(LazyEither::right));
}

代码示例来源:origin: aol/cyclops

@Override
public ReactiveSeq<Either<V1, V2>> streamValues() {
  ReactiveSeq<Either<V1, V2>> x = map1.stream().map(t->t._2()).map(LazyEither::left);
  return x.mergeP(map2.stream().map(t->t._2()).map(LazyEither::right));
}

代码示例来源:origin: aol/cyclops

@Test
public void mergePTest() {
  for (int i = 0; i < ITERATIONS; i++) {
    List<Integer> list = of(3, 6, 9).mergeP(of(2, 4, 8), of(1, 5, 7)).toList();
    assertThat(list, hasItems(1, 2, 3, 4, 5, 6, 7, 8, 9));
    assertThat(list.size(), Matchers.equalTo(9));
  }
}

代码示例来源:origin: aol/cyclops

@Test
public void duplicateTest(){
  Tuple2<ReactiveSeq<Integer>, ReactiveSeq<Integer>> tp = Spouts.of(1, 2, 3, 4).duplicate();
 //   tp._1.printOut();
 //  tp._2.printOut();
  System.out.println("Merge!");
 //   tp._1.mergeP(tp._2).printOut();
  Spouts.of("a","b","c").mergeP(ReactiveSeq.of("bb","cc")).printOut();
}
@Test

代码示例来源:origin: aol/cyclops

@Test
public void mergePTest(){
  for(int i=0;i<100;i++) {
    List<Integer> list = of(3, 6, 9).mergeP(of(2, 4, 8), of(1, 5, 7)).toList();
    assertThat("List is " + list,list, hasItems(1, 2, 3, 4, 5, 6, 7, 8, 9));
    assertThat("List is " + list,list.size(), Matchers.equalTo(9));
  }
}
@Test

代码示例来源:origin: aol/cyclops

@Test
public void mergePTest(){
  //System.out.println(of(3, 6, 9).mergeP(of(2, 4, 8), of(1, 5, 7)).listX());
  for(int i=0;i<ITERATIONS;i++) {
    List<Integer> list = of(3, 6, 9).mergeP(of(2, 4, 8), of(1, 5, 7)).toList();
    assertThat("List is " + list,list, hasItems(1, 2, 3, 4, 5, 6, 7, 8, 9));
    assertThat("List is " + list,list.size(), Matchers.equalTo(9));
  }
}
@Test

代码示例来源:origin: aol/cyclops

@Test
public void mergePTest(){
  List<Integer> list = ReactiveSeq.of(3,6,9).mergeP(ReactiveSeq.of(2,4,8),ReactiveSeq.of(1,5,7)).toList();
  assertThat(list,hasItems(1,2,3,4,5,6,7,8,9));
  assertThat(list.size(),equalTo(9));
}
@Test

代码示例来源:origin: aol/cyclops

@Override
public ReactiveSeq<LazyEither3<V1, V2, V3>> streamValues() {
  ReactiveSeq<LazyEither3<V1, V2, V3>> x = map1.stream().map(t->t._2()).map(LazyEither3::left1);
  return x.mergeP(map2.stream().map(t->t._2()).map(LazyEither3::left2), map3.stream().map(t->t._2()).map(LazyEither3::right));
}

代码示例来源:origin: aol/cyclops

@Override
public ReactiveSeq<LazyEither3<K1, K2, K3>> streamKeys() {
  ReactiveSeq<LazyEither3<K1, K2, K3>> x = map1.stream().map(t->t._1()).map(LazyEither3::left1);
  return x.mergeP(map2.stream().map(t->t._1()).map(LazyEither3::left2), map3.stream().map(t->t._1()).map(LazyEither3::right));
}
@Override

代码示例来源:origin: com.oath.cyclops/cyclops

@Override
public ReactiveSeq<Either<Tuple2<K1, V1>, Tuple2<K2, V2>>> stream() {
  ReactiveSeq<Either<Tuple2<K1, V1>, Tuple2<K2, V2>>> x = map1.stream().map(LazyEither::left);
  return x.mergeP(map2.stream().map(LazyEither::right));
}
@Override

相关文章

微信公众号

最新文章

更多

ReactiveSeq类方法