cyclops.reactive.ReactiveSeq.peek()方法的使用及代码示例

x33g5p2x  于2022-01-29 转载在 其他  
字(5.6k)|赞(0)|评价(0)|浏览(100)

本文整理了Java中cyclops.reactive.ReactiveSeq.peek方法的一些代码示例,展示了ReactiveSeq.peek的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。ReactiveSeq.peek方法的具体详情如下:
包路径:cyclops.reactive.ReactiveSeq
类名称:ReactiveSeq
方法名:peek

ReactiveSeq.peek介绍

暂无

代码示例

代码示例来源:origin: aol/micro-server

public ReactiveSeq<Tuple2<String,String>> extractResources() {
  return resources.stream().peek(resource -> logMissingPath(resource))
              .filter(resource-> resource.getClass().getAnnotation(Path.class)!=null)
              .map(resource -> Tuple.tuple(resource.getClass().getName(),
                  resource.getClass().getAnnotation(Path.class).value()));
}

代码示例来源:origin: aol/micro-server

default String getProviders() {
  String additional = ReactiveSeq.fromStream(PluginLoader.INSTANCE.plugins.get()
                                      .stream())
                  .peek(System.out::println)
                  .filter(module -> module.providers() != null)
                  .concatMap(Plugin::providers)
                  .join(",");
  if (StringUtils.isEmpty(additional))
    return "com.oath.micro.server.rest.providers";
  return "com.oath.micro.server.rest.providers," + additional;
}

代码示例来源:origin: aol/micro-server

private ReactiveSeq<SystemData<String, String>> create(DataLoader dl) {
    return ReactiveSeq.generate(() -> 1)
      .filter(in -> condition.shouldLoad())
      .map(in -> dl.scheduleAndLog())
      .peek(sd -> bus.post(sd));
  }
}

代码示例来源:origin: aol/micro-server

public void schedule() {
    cleaner.forEach(cl -> {
      ReactiveSeq.generate(() -> 1)
            .filter(in -> condition.shouldClean())
            .map(i -> cl.scheduleAndLog())
            .peek(sd -> bus.post(sd))
            .schedule(cl.getCron(), executor);
    });
  }
}

代码示例来源:origin: aol/cyclops

default ReactiveSeq<T> publishTo(Adapter<T>... adapters){
  return peek(e->{
    for(Adapter<T> next:  adapters){
       next.offer(e);
    }
  });
}
default ReactiveSeq<T> publishTo(Signal<T>... signals){

代码示例来源:origin: aol/cyclops

@Test
public void scheduleCron() throws Exception {
  assertThat(Spouts.schedule(Stream.of(1,2,3),"* * * * * ?", Executors.newScheduledThreadPool(1))
    .limit(5)
    .peek(System.out::println)
    .collect(Collectors.toList()).size(),equalTo(3));
}

代码示例来源:origin: aol/cyclops

@Test
public void limitWhileTest(){
  List<Integer> list = new ArrayList<>();
  while(list.size()==0){
    list = of(1,2,3,4,5,6).takeWhile(it -> it<4)
          .peek(it -> System.out.println(it)).collect(Collectors.toList());
  }
  assertThat(Arrays.asList(1,2,3,4,5,6),hasItem(list.get(0)));
}

代码示例来源:origin: aol/cyclops

@Test
public void cronDebounceTest() throws InterruptedException{
  assertThat(of(1,2,3,4)
      .peek(i->count.incrementAndGet())
      .peek(System.out::println)
      .schedule("* * * * * ?", ex)
      .connect()
      .debounce(1,TimeUnit.DAYS)
      .peek(System.out::println)
      .toList(),equalTo(Arrays.asList(1)));
  
  
}
@Test

代码示例来源:origin: aol/cyclops

@Test
public void cronDebounceTest() throws InterruptedException{
  assertThat(Spouts.of(1,2,3,4)
      .peek(i->count.incrementAndGet())
      .peek(System.out::println)
      .schedule("* * * * * ?", ex)
      .connect()
      .debounce(1,TimeUnit.DAYS)
      .peek(System.out::println)
      .toList(),equalTo(Arrays.asList(1)));
  
  
}
@Test

代码示例来源:origin: aol/cyclops

@Test
public void testLazyCollection(){
  Collection<Integer> col = Spouts.of(1,2,3,4,5)
                    .peek(System.out::println).to()
                    .lazyCollection();
  System.out.println("takeOne!");
  col.forEach(System.out::println);
  assertThat(col.size(),equalTo(5));
}

代码示例来源:origin: aol/cyclops

@Test
public void batchBySize() {
  iterate("", last -> "next")
      .limit(100)
      .grouped(10)
      .onePer(1, TimeUnit.MICROSECONDS)
      .peek(batch -> System.out.println("batched : " + batch))
      .concatMap(i->i)
      .peek(individual -> System.out.println("Flattened : "
          + individual))
      .forEach(a->{});
}

代码示例来源:origin: aol/cyclops

@Test
public void skipTimeEmpty(){
  List<Integer> result = ReactiveSeq.<Integer>of()
      .peek(i->sleep(i*100))
      .drop(1000,TimeUnit.MILLISECONDS)
      .toList();
  assertThat(result,equalTo(Arrays.asList()));
}
private int sleep(Integer i) {

代码示例来源:origin: aol/cyclops

@Test
public void limitTimeEmpty(){
  List<Integer> result = ReactiveSeq.<Integer>of()
                  .peek(i->sleep(i*100))
                  .take(1000,TimeUnit.MILLISECONDS)
                  .toList();
  assertThat(result,equalTo(Arrays.asList()));
}
@Test

代码示例来源:origin: aol/cyclops

@Test
public void skipTimeEmpty(){
  List<Integer> result = ReactiveSeq.<Integer>of()
                  .peek(i->sleep(i*100))
                  .drop(1000,TimeUnit.MILLISECONDS)
                  .toList();
  assertThat(result,equalTo(Arrays.asList()));
}
private int sleep(Integer i) {

代码示例来源:origin: aol/cyclops

@Test
public void limitTimeEmpty(){
  List<Integer> result = ReactiveSeq.<Integer>of()
                  .peek(i->sleep(i*100))
                  .take(1000,TimeUnit.MILLISECONDS)
                  .toList();
  assertThat(result,equalTo(Arrays.asList()));
}
@Test

代码示例来源:origin: aol/cyclops

@Test
public void fixedDelay2() {
  Spouts.range(0, 1000)
      .fixedDelay(1l, TimeUnit.MICROSECONDS).peek(System.out::println)
      .forEach(a->{});
}
@Test

代码示例来源:origin: aol/cyclops

@Test
public void onePerSecond() {
  long start = System.currentTimeMillis();
      iterate(0, it -> it + 1)
      .limit(3)
      .onePer(1, TimeUnit.SECONDS)
      .map(seconds -> "hello!")
      .peek(System.out::println)
      .toList();
 assertTrue(System.currentTimeMillis()-start>1900);
}
@Test

代码示例来源:origin: aol/cyclops

@Test
public void onePerSecond() {
  long start = System.currentTimeMillis();
      iterate(0, it -> it + 1)
      .limit(3)
      .onePer(1, TimeUnit.SECONDS)
      .map(seconds -> "hello!")
      .peek(System.out::println)
      .toList();
 assertTrue(System.currentTimeMillis()-start>1900);
}
@Test

代码示例来源:origin: aol/cyclops

@Test
public void jitter() {
  Spouts.range(0, 1000)
      .map(it -> it * 100)
      .jitter(100l)
      .peek(System.out::println)
      .forEach(a->{});
}

代码示例来源:origin: aol/cyclops

@Test
public void onePerSecond() {
      iterate(0, it -> it + 1)
      .limit(100)
      .onePer(1, TimeUnit.MICROSECONDS)
      .map(seconds -> "hello!")
      .peek(System.out::println)
      .toList();
}

相关文章

微信公众号

最新文章

更多

ReactiveSeq类方法