cyclops.reactive.ReactiveSeq.merge()方法的使用及代码示例

x33g5p2x  于2022-01-29 转载在 其他  
字(4.0k)|赞(0)|评价(0)|浏览(128)

本文整理了Java中cyclops.reactive.ReactiveSeq.merge方法的一些代码示例,展示了ReactiveSeq.merge的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。ReactiveSeq.merge方法的具体详情如下:
包路径:cyclops.reactive.ReactiveSeq
类名称:ReactiveSeq
方法名:merge

ReactiveSeq.merge介绍

[英]A potentially asynchronous merge operation where data from each publisher may arrive out of order (if publishers are configured to publish asynchronously. The QueueFactory parameter can be used by pull based Streams to control the maximum queued elements @see QueueFactoriesPush based reactive-streams signal demand via their subscription.
[中]一种潜在的异步合并操作,其中来自每个发布服务器的数据可能会无序到达(如果发布服务器被配置为异步发布)。基于pull的流可以使用QueueFactory参数来控制最大排队元素@see QueueFactoresPush-based reactive Streams通过其订阅发送信号需求。

代码示例

代码示例来源:origin: aol/cyclops

@Test
public void publishToAndMerge(){
  com.oath.cyclops.async.adapters.Queue<Integer> queue = QueueFactories.<Integer>boundedNonBlockingQueue(10)
      .build();
  Thread t=  new Thread( ()-> {
    while(true) {
      try {
        Thread.sleep(500);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
      System.out.println("Closing!");
      queue.close();
    }
  });
  t.start();
  assertThat(Spouts.of(1,2,3)
      .publishTo(queue)
      .peek(System.out::println)
      .merge(queue)
      .toList(), Matchers.equalTo(Arrays.asList(1,1,2,2,3,3)));
}
@Test

代码示例来源:origin: aol/cyclops

@Test
public void publishToAndMerge(){
  com.oath.cyclops.async.adapters.Queue<Integer> queue = QueueFactories.<Integer>boundedNonBlockingQueue(10)
                    .build();
  Thread t=  new Thread( ()-> {
    while(true) {
      try {
        Thread.sleep(500);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
        System.out.println("Closing!");
        queue.close();
    }
  });
  t.start();
  assertThat(ReactiveSeq.of(1,2,3)
             .publishTo(queue)
             .peek(System.out::println)
             .merge(queue)
             .toList(),equalTo(Arrays.asList(1,1,2,2,3,3)));
}

代码示例来源:origin: aol/cyclops

@Test
public void mergeAdapterTest() {
  for (int k = 0; k < ITERATIONS; k++) {
    Queue<Integer> queue = QueueFactories.<Integer>boundedNonBlockingQueue(10)
        .build();
    Thread t = new Thread(() -> {
      queue.add(1);
      queue.add(2);
      queue.add(3);
      try {
        System.out.println("Sleeping!");
        Thread.sleep(10);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
      System.out.println("Waking!");
      System.out.println("Closing! " + queue.size());
      queue.close();
    });
    t.start();
    assertThat(this.<Integer>of().peek(i -> System.out.println("publishing " + i))
        .merge(queue).collect(Collectors.toList()), equalTo(Arrays.asList(1, 2, 3)));
    t = null;
    System.gc();
  }
}

代码示例来源:origin: aol/cyclops

@Test
public void publishToAndMerge() {
  for (int k = 0; k < ITERATIONS; k++) {
    System.out.println("Publish toNested and zip iteration " + k);
    com.oath.cyclops.async.adapters.Queue<Integer> queue = QueueFactories.<Integer>boundedNonBlockingQueue(10)
        .build();
    Thread t = new Thread(() -> {
      try {
        Thread.sleep(1000);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
      System.out.println("Closing! " + queue.size());
      queue.close();
    });
    t.start();
    AtomicBoolean complete = new AtomicBoolean(false);
    AtomicBoolean start = new AtomicBoolean(false);
    List<Integer> list = of(1, 2, 3)
        .publishTo(queue)
        .peek(System.out::println)
        .merge(queue)
        .toList();
    assertThat(list, hasItems(1, 2, 3));
    assertThat(list.size(), equalTo(6));
    System.gc();
  }
}

代码示例来源:origin: aol/cyclops

@Test
public void mergeAdapterTest1() {
  for (int k = 0; k < ITERATIONS; k++) {
    System.out.println("Test iteration " + k);
    Queue<Integer> queue = QueueFactories.<Integer>boundedNonBlockingQueue(10)
        .build();
    Thread t = new Thread(() -> {
      queue.add(1);
      queue.add(2);
      queue.add(3);
      try {
        //    System.out.println("Sleeping!");
        Thread.sleep(10);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
      //   System.out.println("Closing! " + queue.size());
      queue.close();
    });
    t.start();
    assertThat(this.<Integer>of(10).peek(i -> System.out.println("publishing " + i))
        .merge(queue).collect(Collectors.toList()), hasItems(10, 1, 2, 3));
    t = null;
    System.gc();
  }
}

代码示例来源:origin: com.oath.cyclops/cyclops-futurestream

@Override
default FutureStream<U> merge(Adapter<U>... adapters) {
  return fromStream(stream().merge(adapters));
}

相关文章

ReactiveSeq类方法