本文整理了Java中cyclops.reactive.ReactiveSeq.hotStream
方法的一些代码示例,展示了ReactiveSeq.hotStream
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。ReactiveSeq.hotStream
方法的具体详情如下:
包路径:cyclops.reactive.ReactiveSeq
类名称:ReactiveSeq
方法名:hotStream
[英]Turns this ReactiveSeq into a Connectable, a connectable Stream, being executed on a thread on the supplied executor, that is producing data. Note this method creates a Connectable that starts emitting data immediately. For a hotStream that waits until the first user streams connect @see ReactiveSeq#primedHotStream(Executor). The generated Connectable is not pausable, for a pausable Connectable @see ReactiveSeq#pausableHotStream(Executor). Turns this ReactiveSeq into a Connectable, a connectable Stream, being executed on a thread on the supplied executor, that is producing data
Connectable ints = ReactiveSeq.range(0,Integer.MAX_VALUE)
[中]将此ReactiveSeq转换为可连接的可连接流,在提供的执行器上的线程上执行,该执行器正在生成数据。注意:此方法创建一个可连接的,立即开始发送数据。对于等待第一个用户流连接的热流,请参见ReactiveSeq#primedHotStream(Executor)。生成的可连接项不可暂停,对于可暂停的可连接项@see ReactiveSeq#pausableHotStream(Executor)。将此ReactiveSeq转换为可连接的可连接流,在提供的执行器上的线程上执行,该线程正在生成数据
Connectable ints = ReactiveSeq.range(0,Integer.MAX_VALUE)
代码示例来源:origin: aol/cyclops
/**
* Turns this Streamable into a Connectable, a connectable Stream, being executed on a thread on the
* supplied executor, that is producing data
* <pre>
* {@code
* Connectable<Integer> ints = Streamable.range(0,Integer.MAX_VALUE)
.hotStream(exec)
ints.connect().forEach(System.out::println);
* //print out all the ints
* //multiple consumers are possible, so other Streams can connect on different Threads
*
* }
* </pre>
* @param e Executor to execute this Streamable on
* @return a Connectable Connectable
*/
default Connectable<T> hotStream(final Executor e) {
return this.stream().hotStream(e);
}
代码示例来源:origin: aol/cyclops
@Test
public void hotStream() throws InterruptedException{
value= null;
CountDownLatch latch = new CountDownLatch(1);
Spouts.of(1,2,3)
.peek(v->value=v)
.peek(v->latch.countDown())
.hotStream(exec);
latch.await();
assertTrue(value!=null);
}
@Test
代码示例来源:origin: aol/cyclops
@Test
public void hotStream() throws InterruptedException{
value= null;
CountDownLatch latch = new CountDownLatch(1);
ReactiveSeq.of(1,2,3)
.peek(v->value=v)
.peek(v->latch.countDown())
.hotStream(exec);
latch.await();
assertTrue(value!=null);
}
@Test
代码示例来源:origin: aol/cyclops
@Test
public void hotStream() throws InterruptedException{
value= null;
CountDownLatch latch = new CountDownLatch(1);
of(1,2,3)
.peek(v->value=v)
.peek(v->latch.countDown())
.hotStream(exec);
latch.await();
assertTrue(value!=null);
}
@Test @Ignore
代码示例来源:origin: aol/cyclops
@Test
public void backpressure(){
captured= "";
LinkedBlockingQueue<String> blockingQueue = new LinkedBlockingQueue<String>(3);
diff = System.currentTimeMillis();
ReactiveSeq.range(0, Integer.MAX_VALUE)
.limit(2)
.map(i -> i.toString())
.peek(v-> diff = System.currentTimeMillis()-diff)
.peek(System.out::println)
.hotStream(exec)
.connect(blockingQueue)
.onePer(1, TimeUnit.SECONDS)
.forEach(c->captured=c);
assertThat(diff,greaterThan(500l));
}
@Test
代码示例来源:origin: aol/cyclops
@Test
public void backpressure(){
captured= "";
LinkedBlockingQueue<String> blockingQueue = new LinkedBlockingQueue<String>(3);
diff = System.currentTimeMillis();
Spouts.range(0, Integer.MAX_VALUE)
.limit(2)
.map(i -> i.toString())
.peek(v-> diff = System.currentTimeMillis()-diff)
.peek(System.out::println)
.hotStream(exec)
.connect(blockingQueue)
.onePer(1, TimeUnit.SECONDS)
.forEach(c->captured=c);
assertThat(diff,greaterThan(500l));
}
@Test
代码示例来源:origin: aol/cyclops
@Test @Ignore
public void backpressure(){
captured= "";
LinkedBlockingQueue<String> blockingQueue = new LinkedBlockingQueue<String>(3);
diff = System.currentTimeMillis();
range(0, Integer.MAX_VALUE)
.limit(2)
.map(i -> i.toString())
.peek(v-> diff = System.currentTimeMillis()-diff)
.peek(System.out::println)
.hotStream(exec)
.connect(blockingQueue)
.onePer(1, TimeUnit.SECONDS)
.forEach(c->captured=c);
assertThat(diff,greaterThan(500l));
}
@Test
代码示例来源:origin: aol/cyclops
@Test
public void hotStreamConnectBlockingQueue() throws InterruptedException{
value= null;
CountDownLatch latch = new CountDownLatch(1);
Spouts.range(0,Integer.MAX_VALUE)
.limit(1000)
.peek(v->value=v)
.peek(v->latch.countDown())
.hotStream(exec)
.connect(new LinkedBlockingQueue<>())
.limit(100)
.runFuture(ForkJoinPool.commonPool(),
t->t.forEach(System.out::println,System.err::println));
latch.await();
assertTrue(value!=null);
}
@Test
代码示例来源:origin: aol/cyclops
@Test @Ignore
public void hotStreamConnectBlockingQueue() throws InterruptedException{
value= null;
CountDownLatch latch = new CountDownLatch(1);
range(0,Integer.MAX_VALUE)
.limit(1000)
.peek(v->value=v)
.peek(v->latch.countDown())
.hotStream(exec)
.connect(new LinkedBlockingQueue<>())
.limit(100)
.runFuture(ForkJoinPool.commonPool(),t ->
t.forEach(System.out::println,System.err::println));
latch.await();
assertTrue(value!=null);
}
@Test @Ignore
代码示例来源:origin: aol/cyclops
@Test
public void hotStreamConnectBlockingQueue() throws InterruptedException{
value= null;
CountDownLatch latch = new CountDownLatch(1);
ReactiveSeq.range(0,Integer.MAX_VALUE)
.limit(1000)
.peek(v->value=v)
.peek(v->latch.countDown())
.hotStream(exec)
.connect(new LinkedBlockingQueue<>())
.limit(100)
.runFuture(ForkJoinPool.commonPool(),
t->t.forEach(System.out::println,System.err::println));
latch.await();
assertTrue(value!=null);
}
@Test
代码示例来源:origin: com.oath.cyclops/cyclops
/**
* Turns this Streamable into a Connectable, a connectable Stream, being executed on a thread on the
* supplied executor, that is producing data
* <pre>
* {@code
* Connectable<Integer> ints = Streamable.range(0,Integer.MAX_VALUE)
.hotStream(exec)
ints.connect().forEach(System.out::println);
* //print out all the ints
* //multiple consumers are possible, so other Streams can connect on different Threads
*
* }
* </pre>
* @param e Executor to execute this Streamable on
* @return a Connectable Connectable
*/
default Connectable<T> hotStream(final Executor e) {
return this.stream().hotStream(e);
}
内容来源于网络,如有侵权,请联系作者删除!