本文整理了Java中cyclops.reactive.ReactiveSeq.peek
方法的一些代码示例,展示了ReactiveSeq.peek
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。ReactiveSeq.peek
方法的具体详情如下:
包路径:cyclops.reactive.ReactiveSeq
类名称:ReactiveSeq
方法名:peek
暂无
代码示例来源:origin: aol/micro-server
public ReactiveSeq<Tuple2<String,String>> extractResources() {
return resources.stream().peek(resource -> logMissingPath(resource))
.filter(resource-> resource.getClass().getAnnotation(Path.class)!=null)
.map(resource -> Tuple.tuple(resource.getClass().getName(),
resource.getClass().getAnnotation(Path.class).value()));
}
代码示例来源:origin: aol/micro-server
default String getProviders() {
String additional = ReactiveSeq.fromStream(PluginLoader.INSTANCE.plugins.get()
.stream())
.peek(System.out::println)
.filter(module -> module.providers() != null)
.concatMap(Plugin::providers)
.join(",");
if (StringUtils.isEmpty(additional))
return "com.oath.micro.server.rest.providers";
return "com.oath.micro.server.rest.providers," + additional;
}
代码示例来源:origin: aol/micro-server
private ReactiveSeq<SystemData<String, String>> create(DataLoader dl) {
return ReactiveSeq.generate(() -> 1)
.filter(in -> condition.shouldLoad())
.map(in -> dl.scheduleAndLog())
.peek(sd -> bus.post(sd));
}
}
代码示例来源:origin: aol/micro-server
public void schedule() {
cleaner.forEach(cl -> {
ReactiveSeq.generate(() -> 1)
.filter(in -> condition.shouldClean())
.map(i -> cl.scheduleAndLog())
.peek(sd -> bus.post(sd))
.schedule(cl.getCron(), executor);
});
}
}
代码示例来源:origin: aol/cyclops
default ReactiveSeq<T> publishTo(Adapter<T>... adapters){
return peek(e->{
for(Adapter<T> next: adapters){
next.offer(e);
}
});
}
default ReactiveSeq<T> publishTo(Signal<T>... signals){
代码示例来源:origin: aol/cyclops
@Test
public void scheduleCron() throws Exception {
assertThat(Spouts.schedule(Stream.of(1,2,3),"* * * * * ?", Executors.newScheduledThreadPool(1))
.limit(5)
.peek(System.out::println)
.collect(Collectors.toList()).size(),equalTo(3));
}
代码示例来源:origin: aol/cyclops
@Test
public void limitWhileTest(){
List<Integer> list = new ArrayList<>();
while(list.size()==0){
list = of(1,2,3,4,5,6).takeWhile(it -> it<4)
.peek(it -> System.out.println(it)).collect(Collectors.toList());
}
assertThat(Arrays.asList(1,2,3,4,5,6),hasItem(list.get(0)));
}
代码示例来源:origin: aol/cyclops
@Test
public void cronDebounceTest() throws InterruptedException{
assertThat(of(1,2,3,4)
.peek(i->count.incrementAndGet())
.peek(System.out::println)
.schedule("* * * * * ?", ex)
.connect()
.debounce(1,TimeUnit.DAYS)
.peek(System.out::println)
.toList(),equalTo(Arrays.asList(1)));
}
@Test
代码示例来源:origin: aol/cyclops
@Test
public void cronDebounceTest() throws InterruptedException{
assertThat(Spouts.of(1,2,3,4)
.peek(i->count.incrementAndGet())
.peek(System.out::println)
.schedule("* * * * * ?", ex)
.connect()
.debounce(1,TimeUnit.DAYS)
.peek(System.out::println)
.toList(),equalTo(Arrays.asList(1)));
}
@Test
代码示例来源:origin: aol/cyclops
@Test
public void testLazyCollection(){
Collection<Integer> col = Spouts.of(1,2,3,4,5)
.peek(System.out::println).to()
.lazyCollection();
System.out.println("takeOne!");
col.forEach(System.out::println);
assertThat(col.size(),equalTo(5));
}
代码示例来源:origin: aol/cyclops
@Test
public void batchBySize() {
iterate("", last -> "next")
.limit(100)
.grouped(10)
.onePer(1, TimeUnit.MICROSECONDS)
.peek(batch -> System.out.println("batched : " + batch))
.concatMap(i->i)
.peek(individual -> System.out.println("Flattened : "
+ individual))
.forEach(a->{});
}
代码示例来源:origin: aol/cyclops
@Test
public void skipTimeEmpty(){
List<Integer> result = ReactiveSeq.<Integer>of()
.peek(i->sleep(i*100))
.drop(1000,TimeUnit.MILLISECONDS)
.toList();
assertThat(result,equalTo(Arrays.asList()));
}
private int sleep(Integer i) {
代码示例来源:origin: aol/cyclops
@Test
public void limitTimeEmpty(){
List<Integer> result = ReactiveSeq.<Integer>of()
.peek(i->sleep(i*100))
.take(1000,TimeUnit.MILLISECONDS)
.toList();
assertThat(result,equalTo(Arrays.asList()));
}
@Test
代码示例来源:origin: aol/cyclops
@Test
public void skipTimeEmpty(){
List<Integer> result = ReactiveSeq.<Integer>of()
.peek(i->sleep(i*100))
.drop(1000,TimeUnit.MILLISECONDS)
.toList();
assertThat(result,equalTo(Arrays.asList()));
}
private int sleep(Integer i) {
代码示例来源:origin: aol/cyclops
@Test
public void limitTimeEmpty(){
List<Integer> result = ReactiveSeq.<Integer>of()
.peek(i->sleep(i*100))
.take(1000,TimeUnit.MILLISECONDS)
.toList();
assertThat(result,equalTo(Arrays.asList()));
}
@Test
代码示例来源:origin: aol/cyclops
@Test
public void fixedDelay2() {
Spouts.range(0, 1000)
.fixedDelay(1l, TimeUnit.MICROSECONDS).peek(System.out::println)
.forEach(a->{});
}
@Test
代码示例来源:origin: aol/cyclops
@Test
public void onePerSecond() {
long start = System.currentTimeMillis();
iterate(0, it -> it + 1)
.limit(3)
.onePer(1, TimeUnit.SECONDS)
.map(seconds -> "hello!")
.peek(System.out::println)
.toList();
assertTrue(System.currentTimeMillis()-start>1900);
}
@Test
代码示例来源:origin: aol/cyclops
@Test
public void onePerSecond() {
long start = System.currentTimeMillis();
iterate(0, it -> it + 1)
.limit(3)
.onePer(1, TimeUnit.SECONDS)
.map(seconds -> "hello!")
.peek(System.out::println)
.toList();
assertTrue(System.currentTimeMillis()-start>1900);
}
@Test
代码示例来源:origin: aol/cyclops
@Test
public void jitter() {
Spouts.range(0, 1000)
.map(it -> it * 100)
.jitter(100l)
.peek(System.out::println)
.forEach(a->{});
}
代码示例来源:origin: aol/cyclops
@Test
public void onePerSecond() {
iterate(0, it -> it + 1)
.limit(100)
.onePer(1, TimeUnit.MICROSECONDS)
.map(seconds -> "hello!")
.peek(System.out::println)
.toList();
}
内容来源于网络,如有侵权,请联系作者删除!