本文整理了Java中cyclops.reactive.ReactiveSeq.merge
方法的一些代码示例,展示了ReactiveSeq.merge
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。ReactiveSeq.merge
方法的具体详情如下:
包路径:cyclops.reactive.ReactiveSeq
类名称:ReactiveSeq
方法名:merge
[英]A potentially asynchronous merge operation where data from each publisher may arrive out of order (if publishers are configured to publish asynchronously. The QueueFactory parameter can be used by pull based Streams to control the maximum queued elements @see QueueFactoriesPush based reactive-streams signal demand via their subscription.
[中]一种潜在的异步合并操作,其中来自每个发布服务器的数据可能会无序到达(如果发布服务器被配置为异步发布)。基于pull的流可以使用QueueFactory参数来控制最大排队元素@see QueueFactoresPush-based reactive Streams通过其订阅发送信号需求。
代码示例来源:origin: aol/cyclops
@Test
public void publishToAndMerge(){
com.oath.cyclops.async.adapters.Queue<Integer> queue = QueueFactories.<Integer>boundedNonBlockingQueue(10)
.build();
Thread t= new Thread( ()-> {
while(true) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Closing!");
queue.close();
}
});
t.start();
assertThat(Spouts.of(1,2,3)
.publishTo(queue)
.peek(System.out::println)
.merge(queue)
.toList(), Matchers.equalTo(Arrays.asList(1,1,2,2,3,3)));
}
@Test
代码示例来源:origin: aol/cyclops
@Test
public void publishToAndMerge(){
com.oath.cyclops.async.adapters.Queue<Integer> queue = QueueFactories.<Integer>boundedNonBlockingQueue(10)
.build();
Thread t= new Thread( ()-> {
while(true) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Closing!");
queue.close();
}
});
t.start();
assertThat(ReactiveSeq.of(1,2,3)
.publishTo(queue)
.peek(System.out::println)
.merge(queue)
.toList(),equalTo(Arrays.asList(1,1,2,2,3,3)));
}
代码示例来源:origin: aol/cyclops
@Test
public void mergeAdapterTest() {
for (int k = 0; k < ITERATIONS; k++) {
Queue<Integer> queue = QueueFactories.<Integer>boundedNonBlockingQueue(10)
.build();
Thread t = new Thread(() -> {
queue.add(1);
queue.add(2);
queue.add(3);
try {
System.out.println("Sleeping!");
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Waking!");
System.out.println("Closing! " + queue.size());
queue.close();
});
t.start();
assertThat(this.<Integer>of().peek(i -> System.out.println("publishing " + i))
.merge(queue).collect(Collectors.toList()), equalTo(Arrays.asList(1, 2, 3)));
t = null;
System.gc();
}
}
代码示例来源:origin: aol/cyclops
@Test
public void publishToAndMerge() {
for (int k = 0; k < ITERATIONS; k++) {
System.out.println("Publish toNested and zip iteration " + k);
com.oath.cyclops.async.adapters.Queue<Integer> queue = QueueFactories.<Integer>boundedNonBlockingQueue(10)
.build();
Thread t = new Thread(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Closing! " + queue.size());
queue.close();
});
t.start();
AtomicBoolean complete = new AtomicBoolean(false);
AtomicBoolean start = new AtomicBoolean(false);
List<Integer> list = of(1, 2, 3)
.publishTo(queue)
.peek(System.out::println)
.merge(queue)
.toList();
assertThat(list, hasItems(1, 2, 3));
assertThat(list.size(), equalTo(6));
System.gc();
}
}
代码示例来源:origin: aol/cyclops
@Test
public void mergeAdapterTest1() {
for (int k = 0; k < ITERATIONS; k++) {
System.out.println("Test iteration " + k);
Queue<Integer> queue = QueueFactories.<Integer>boundedNonBlockingQueue(10)
.build();
Thread t = new Thread(() -> {
queue.add(1);
queue.add(2);
queue.add(3);
try {
// System.out.println("Sleeping!");
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
// System.out.println("Closing! " + queue.size());
queue.close();
});
t.start();
assertThat(this.<Integer>of(10).peek(i -> System.out.println("publishing " + i))
.merge(queue).collect(Collectors.toList()), hasItems(10, 1, 2, 3));
t = null;
System.gc();
}
}
代码示例来源:origin: com.oath.cyclops/cyclops-futurestream
@Override
default FutureStream<U> merge(Adapter<U>... adapters) {
return fromStream(stream().merge(adapters));
}
内容来源于网络,如有侵权,请联系作者删除!