本文整理了Java中cyclops.reactive.ReactiveSeq.scheduleFixedRate
方法的一些代码示例,展示了ReactiveSeq.scheduleFixedRate
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。ReactiveSeq.scheduleFixedRate
方法的具体详情如下:
包路径:cyclops.reactive.ReactiveSeq
类名称:ReactiveSeq
方法名:scheduleFixedRate
[英]Execute this Stream on a schedule
//run every 60 seconds
Connect to the Scheduled Stream
Connectable dataStream = ReactiveSeq.generate(() -> "next job:" + formatDate(new Date())).map(this::processJob)
[中]按计划执行此流
//run every 60 seconds
连接到计划流
Connectable dataStream = ReactiveSeq.generate(() -> "next job:" + formatDate(new Date())).map(this::processJob)
代码示例来源:origin: aol/cyclops
/**
* Memoize a function and update the cached values asynchronously using the provided Scheduled Executor Service
* Does not support null keys
*
* @param fn Function to Memoize
* @param ex Scheduled Executor Service
* @param updateRateInMillis Time in millis between async updates
* @param <T> Input Type of Function
* @param <R> Return type of Function
* @return Memoized asynchronously updating function
*/
public static <T, R> Function1<T, R> memoizeFunctionAsync(final Function<T, R> fn, ScheduledExecutorService ex, long updateRateInMillis){
final Map<T, R> lazy = new ConcurrentHashMap<>();
ReactiveSeq.generate(()->{
lazy.forEach((k,v)->{
lazy.put(k,fn.apply(k));
});
return null;
}).scheduleFixedRate(updateRateInMillis,ex);
return t -> lazy.computeIfAbsent(t, fn);
}
/**
代码示例来源:origin: aol/cyclops
@Test @Ignore
public void backpressureScheduledRate(){
captured= "";
diff = System.currentTimeMillis();
LinkedBlockingQueue<String> blockingQueue = new LinkedBlockingQueue<String>(1);
blockingQueue.add("10");
blockingQueue.offer("10");
range(0, Integer.MAX_VALUE)
.limit(2)
.peek(v-> diff = System.currentTimeMillis())
.map(i -> i.toString())
.scheduleFixedRate(1l, scheduled)
.connect(blockingQueue)
.onePer(1, TimeUnit.SECONDS)
.peek(i->System.out.println("BQ " + blockingQueue))
.peek(System.out::println)
.forEach(c->captured=c);
assertThat(System.currentTimeMillis() - diff,greaterThan(1500l));
}
@Test @Ignore
代码示例来源:origin: aol/cyclops
@Test
public void backpressureScheduledRate(){
captured= "";
diff = System.currentTimeMillis();
LinkedBlockingQueue<String> blockingQueue = new LinkedBlockingQueue<String>(1);
blockingQueue.add("10");
blockingQueue.offer("10");
ReactiveSeq.range(0, Integer.MAX_VALUE)
.limit(2)
.peek(v-> diff = System.currentTimeMillis())
.map(i -> i.toString())
.scheduleFixedRate(1l, scheduled)
.connect(blockingQueue)
.onePer(1, TimeUnit.SECONDS)
.peek(i->System.out.println("BQ " + blockingQueue))
.peek(System.out::println)
.forEach(c->captured=c);
assertThat(System.currentTimeMillis() - diff,greaterThan(1500l));
}
@Test
代码示例来源:origin: aol/cyclops
@Test
public void backpressureScheduledRate(){
captured= "";
diff = System.currentTimeMillis();
LinkedBlockingQueue<String> blockingQueue = new LinkedBlockingQueue<String>(1);
blockingQueue.add("10");
blockingQueue.offer("10");
Spouts.range(0, Integer.MAX_VALUE)
.limit(2)
.peek(v-> diff = System.currentTimeMillis())
.map(i -> i.toString())
.scheduleFixedRate(1l, scheduled)
.connect(blockingQueue)
.onePer(1, TimeUnit.SECONDS)
.peek(i->System.out.println("BQ " + blockingQueue))
.peek(System.out::println)
.forEach(c->captured=c);
assertThat(System.currentTimeMillis() - diff,greaterThan(1500l));
}
@Test
代码示例来源:origin: aol/cyclops
@Test
public void fixedRateTest() throws InterruptedException{
assertThat(of(1,2,3,4)
.peek(i->count.incrementAndGet())
.peek(System.out::println)
.scheduleFixedRate(1000, ex)
.connect()
.debounce(1,TimeUnit.DAYS)
.peek(System.out::println)
.toList(),equalTo(Arrays.asList(1)));
}
@Test
代码示例来源:origin: aol/cyclops
@Test
public void fixedRateTest() throws InterruptedException{
assertThat(Spouts.of(1,2,3,4)
.peek(i->count.incrementAndGet())
.peek(System.out::println)
.scheduleFixedRate(1000, ex)
.connect()
.debounce(1,TimeUnit.DAYS)
.peek(System.out::println)
.toList(),equalTo(Arrays.asList(1)));
}
@Test
代码示例来源:origin: aol/cyclops
@Test
public void fixedRateTest() throws InterruptedException{
assertThat(of(1,2,3,4)
.peek(i->count.incrementAndGet())
.peek(System.out::println)
.scheduleFixedRate(1000, ex)
.connect()
.debounce(1,TimeUnit.DAYS)
.peek(System.out::println)
.toList(),equalTo(Arrays.asList(1)));
}
@Test
代码示例来源:origin: aol/cyclops
@Test
public void fixedRateTest() throws InterruptedException{
assertThat(ReactiveSeq.of(1,2,3,4)
.peek(i->count.incrementAndGet())
.peek(System.out::println)
.scheduleFixedRate(1000, ex)
.connect()
.debounce(1,TimeUnit.DAYS)
.peek(System.out::println)
.toList(),equalTo(Arrays.asList(1)));
}
@Test
代码示例来源:origin: aol/cyclops
@Test
public void fixedRateTest() throws InterruptedException{
assertThat(of(1,2,3,4)
.peek(i->count.incrementAndGet())
.peek(System.out::println)
.scheduleFixedRate(1000, ex)
.connect()
.debounce(1,TimeUnit.DAYS)
.peek(System.out::println)
.toList(),equalTo(Arrays.asList(1)));
}
@Test
代码示例来源:origin: com.oath.cyclops/cyclops
/**
* Memoize a function and update the cached values asynchronously using the provided Scheduled Executor Service
* Does not support null keys
*
* @param fn Function to Memoize
* @param ex Scheduled Executor Service
* @param updateRateInMillis Time in millis between async updates
* @param <T> Input Type of Function
* @param <R> Return type of Function
* @return Memoized asynchronously updating function
*/
public static <T, R> Function1<T, R> memoizeFunctionAsync(final Function<T, R> fn, ScheduledExecutorService ex, long updateRateInMillis){
final Map<T, R> lazy = new ConcurrentHashMap<>();
ReactiveSeq.generate(()->{
lazy.forEach((k,v)->{
lazy.put(k,fn.apply(k));
});
return null;
}).scheduleFixedRate(updateRateInMillis,ex);
return t -> lazy.computeIfAbsent(t, fn);
}
/**
代码示例来源:origin: com.oath.cyclops/cyclops-futurestream
@Override
public Connectable<U> scheduleFixedRate(final long rate, final ScheduledExecutorService ex) {
return ReactiveSeq.<U> fromStream(this.stream())
.scheduleFixedRate(rate, ex);
}
内容来源于网络,如有侵权,请联系作者删除!