cyclops.reactive.ReactiveSeq.hotStream()方法的使用及代码示例

x33g5p2x  于2022-01-29 转载在 其他  
字(5.9k)|赞(0)|评价(0)|浏览(89)

本文整理了Java中cyclops.reactive.ReactiveSeq.hotStream方法的一些代码示例,展示了ReactiveSeq.hotStream的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。ReactiveSeq.hotStream方法的具体详情如下:
包路径:cyclops.reactive.ReactiveSeq
类名称:ReactiveSeq
方法名:hotStream

ReactiveSeq.hotStream介绍

[英]Turns this ReactiveSeq into a Connectable, a connectable Stream, being executed on a thread on the supplied executor, that is producing data. Note this method creates a Connectable that starts emitting data immediately. For a hotStream that waits until the first user streams connect @see ReactiveSeq#primedHotStream(Executor). The generated Connectable is not pausable, for a pausable Connectable @see ReactiveSeq#pausableHotStream(Executor). Turns this ReactiveSeq into a Connectable, a connectable Stream, being executed on a thread on the supplied executor, that is producing data

Connectable ints = ReactiveSeq.range(0,Integer.MAX_VALUE)

[中]将此ReactiveSeq转换为可连接的可连接流,在提供的执行器上的线程上执行,该执行器正在生成数据。注意:此方法创建一个可连接的,立即开始发送数据。对于等待第一个用户流连接的热流,请参见ReactiveSeq#primedHotStream(Executor)。生成的可连接项不可暂停,对于可暂停的可连接项@see ReactiveSeq#pausableHotStream(Executor)。将此ReactiveSeq转换为可连接的可连接流,在提供的执行器上的线程上执行,该线程正在生成数据

Connectable ints = ReactiveSeq.range(0,Integer.MAX_VALUE)

代码示例

代码示例来源:origin: aol/cyclops

/**
 * Turns this Streamable into a Connectable, a connectable Stream, being executed on a thread on the
 * supplied executor, that is producing data
 * <pre>
 * {@code
 *  Connectable<Integer> ints = Streamable.range(0,Integer.MAX_VALUE)
                    .hotStream(exec)
  ints.connect().forEach(System.out::println);
 *  //print out all the ints
 *  //multiple consumers are possible, so other Streams can connect on different Threads
 *
 * }
 * </pre>
 * @param e Executor to execute this Streamable on
 * @return a Connectable Connectable
 */
default Connectable<T> hotStream(final Executor e) {
  return this.stream().hotStream(e);
}

代码示例来源:origin: aol/cyclops

@Test
public void hotStream() throws InterruptedException{
  value= null;
  CountDownLatch latch = new CountDownLatch(1);
  Spouts.of(1,2,3)
      .peek(v->value=v)
      .peek(v->latch.countDown())
      .hotStream(exec);
  latch.await();
  assertTrue(value!=null);
}
@Test

代码示例来源:origin: aol/cyclops

@Test
public void hotStream() throws InterruptedException{
  value= null;
  CountDownLatch latch = new CountDownLatch(1);
  ReactiveSeq.of(1,2,3)
      .peek(v->value=v)
      .peek(v->latch.countDown())
      .hotStream(exec);
  latch.await();
  assertTrue(value!=null);
}
@Test

代码示例来源:origin: aol/cyclops

@Test
public void hotStream() throws InterruptedException{
  value= null;
  CountDownLatch latch = new CountDownLatch(1);
  of(1,2,3)
      .peek(v->value=v)
      .peek(v->latch.countDown())
      .hotStream(exec);
  latch.await();
  assertTrue(value!=null);
}
@Test @Ignore

代码示例来源:origin: aol/cyclops

@Test
public void backpressure(){
  captured= "";
   LinkedBlockingQueue<String> blockingQueue = new LinkedBlockingQueue<String>(3);
   diff =  System.currentTimeMillis();
   ReactiveSeq.range(0, Integer.MAX_VALUE)
     .limit(2)
     .map(i -> i.toString())
     .peek(v-> diff = System.currentTimeMillis()-diff)
     .peek(System.out::println)
     .hotStream(exec)
     .connect(blockingQueue)
     .onePer(1, TimeUnit.SECONDS)
     .forEach(c->captured=c);
   assertThat(diff,greaterThan(500l));
}
@Test

代码示例来源:origin: aol/cyclops

@Test
public void backpressure(){
  captured= "";
   LinkedBlockingQueue<String> blockingQueue = new LinkedBlockingQueue<String>(3);
   diff =  System.currentTimeMillis();
   Spouts.range(0, Integer.MAX_VALUE)
     .limit(2)
     .map(i -> i.toString())
     .peek(v-> diff = System.currentTimeMillis()-diff)
     .peek(System.out::println)
     .hotStream(exec)
     .connect(blockingQueue)
     .onePer(1, TimeUnit.SECONDS)
     .forEach(c->captured=c);
   assertThat(diff,greaterThan(500l));
}
@Test

代码示例来源:origin: aol/cyclops

@Test @Ignore
public void backpressure(){
  captured= "";
   LinkedBlockingQueue<String> blockingQueue = new LinkedBlockingQueue<String>(3);
   diff =  System.currentTimeMillis();
   range(0, Integer.MAX_VALUE)
     .limit(2)
     .map(i -> i.toString())
     .peek(v-> diff = System.currentTimeMillis()-diff)
     .peek(System.out::println)
     .hotStream(exec)
     .connect(blockingQueue)
     .onePer(1, TimeUnit.SECONDS)
     .forEach(c->captured=c);
   assertThat(diff,greaterThan(500l));
}
@Test

代码示例来源:origin: aol/cyclops

@Test
public void hotStreamConnectBlockingQueue() throws InterruptedException{
  value= null;
  CountDownLatch latch = new CountDownLatch(1);
  Spouts.range(0,Integer.MAX_VALUE)
      .limit(1000)
      .peek(v->value=v)
      .peek(v->latch.countDown())
      .hotStream(exec)
      .connect(new LinkedBlockingQueue<>())
      .limit(100)
      .runFuture(ForkJoinPool.commonPool(),
          t->t.forEach(System.out::println,System.err::println));
  latch.await();
  assertTrue(value!=null);
}
@Test

代码示例来源:origin: aol/cyclops

@Test @Ignore
public void hotStreamConnectBlockingQueue() throws InterruptedException{
  value= null;
  CountDownLatch latch = new CountDownLatch(1);
  range(0,Integer.MAX_VALUE)
      .limit(1000)
      .peek(v->value=v)
      .peek(v->latch.countDown())
      .hotStream(exec)
      .connect(new LinkedBlockingQueue<>())
      .limit(100)
      .runFuture(ForkJoinPool.commonPool(),t ->
        t.forEach(System.out::println,System.err::println));
  latch.await();
  assertTrue(value!=null);
}
@Test @Ignore

代码示例来源:origin: aol/cyclops

@Test
public void hotStreamConnectBlockingQueue() throws InterruptedException{
  value= null;
  CountDownLatch latch = new CountDownLatch(1);
  ReactiveSeq.range(0,Integer.MAX_VALUE)
      .limit(1000)
      .peek(v->value=v)
      .peek(v->latch.countDown())
      .hotStream(exec)
      .connect(new LinkedBlockingQueue<>())
      .limit(100)
      .runFuture(ForkJoinPool.commonPool(),
      t->t.forEach(System.out::println,System.err::println));
  latch.await();
  assertTrue(value!=null);
}
@Test

代码示例来源:origin: com.oath.cyclops/cyclops

/**
 * Turns this Streamable into a Connectable, a connectable Stream, being executed on a thread on the
 * supplied executor, that is producing data
 * <pre>
 * {@code
 *  Connectable<Integer> ints = Streamable.range(0,Integer.MAX_VALUE)
                    .hotStream(exec)
  ints.connect().forEach(System.out::println);
 *  //print out all the ints
 *  //multiple consumers are possible, so other Streams can connect on different Threads
 *
 * }
 * </pre>
 * @param e Executor to execute this Streamable on
 * @return a Connectable Connectable
 */
default Connectable<T> hotStream(final Executor e) {
  return this.stream().hotStream(e);
}

相关文章

微信公众号

最新文章

更多

ReactiveSeq类方法