本文整理了Java中cyclops.reactive.ReactiveSeq.schedule
方法的一些代码示例,展示了ReactiveSeq.schedule
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。ReactiveSeq.schedule
方法的具体详情如下:
包路径:cyclops.reactive.ReactiveSeq
类名称:ReactiveSeq
方法名:schedule
[英]Execute this Stream on a schedule
//run at 8PM every night
Connect to the Scheduled Stream
Connectable dataStream = ReactiveSeq.generate(() -> "next job:" + formatDate(new Date())).map(this::processJob)
[中]按计划执行此流
//run at 8PM every night
连接到计划流
Connectable dataStream = ReactiveSeq.generate(() -> "next job:" + formatDate(new Date())).map(this::processJob)
代码示例来源:origin: aol/micro-server
public void schedule() {
cleaner.forEach(cl -> {
ReactiveSeq.generate(() -> 1)
.filter(in -> condition.shouldClean())
.map(i -> cl.scheduleAndLog())
.peek(sd -> bus.post(sd))
.schedule(cl.getCron(), executor);
});
}
}
代码示例来源:origin: aol/micro-server
public void schedule() {
loader.forEach(dl -> {
// run on startup
create(dl).limit(1).foldFuture(executor, s -> s.forEach(Long.MAX_VALUE, l -> {}));
// schedule
create(dl).schedule(dl.getCron(), executor);
});
}
代码示例来源:origin: aol/cyclops
/**
* Memoize this function and update cached values on a schedule
* Does not support null keys
*
* @param fn Function to Memoize
* @param ex Scheduled Executor Service
* @param cron Cron expression for updating cached values asynchonrously
* @param <T> Input Type of Function
* @param <R> Return type of Function
* @return Memoized asynchronously updating function
*/
public static <T, R> Function1<T, R> memoizeFunctionAsync(final Function<T, R> fn, ScheduledExecutorService ex, String cron) {
final Map<T, R> lazy = new ConcurrentHashMap<>();
ReactiveSeq.generate(()->{
lazy.forEach((k,v)->{
lazy.put(k,fn.apply(k));
});
return null;
}).schedule(cron,ex);
return t -> lazy.computeIfAbsent(t, fn);
}
代码示例来源:origin: aol/cyclops
.schedule(cron, ex);
代码示例来源:origin: aol/cyclops
@Test
public void cronTest() throws InterruptedException{
of(1,2,3,4)
.peek(i->count.incrementAndGet())
.peek(System.out::println)
.schedule("* * * * * ?", ex);
Thread.sleep(5000);
}
@Test
代码示例来源:origin: aol/cyclops
@Test
public void cronTest() throws InterruptedException{
of(1,2,3,4)
.peek(i->count.incrementAndGet())
.peek(System.out::println)
.schedule("* * * * * ?", ex);
Thread.sleep(5000);
}
@Test
代码示例来源:origin: aol/cyclops
@Test
public void cronTest() throws InterruptedException{
of(1,2,3,4)
.peek(i->count.incrementAndGet())
.peek(System.out::println)
.schedule("* * * * * ?", ex);
Thread.sleep(5000);
}
@Test
代码示例来源:origin: aol/cyclops
@Test
public void cronTest() throws InterruptedException{
ReactiveSeq.of(1,2,3,4)
.peek(i->count.incrementAndGet())
.peek(System.out::println)
.schedule("* * * * * ?", ex);
Thread.sleep(5000);
}
@Test
代码示例来源:origin: aol/cyclops
@Test
public void cronTest() throws InterruptedException{
Spouts.of(1,2,3,4)
.peek(i->count.incrementAndGet())
.peek(System.out::println)
.schedule("* * * * * ?", ex);
Thread.sleep(5000);
}
@Test
代码示例来源:origin: aol/cyclops
static <T> ReactiveSeq<T> schedule(final Stream<T> stream,final String cron,final ScheduledExecutorService exec) {
ReactiveSubscriber<T> sub = reactiveSubscriber();
AtomicBoolean isOpen = new AtomicBoolean(true);
Subscription[] s= {null};
sub.onSubscribe(new Subscription() {
@Override
public void request(long n) {
s[0].request(n);
}
@Override
public void cancel() {
isOpen.set(false);
}
});
s[0] = ReactiveSeq.fromStream(stream)
.takeWhile(e -> isOpen.get())
.schedule(cron, exec)
.connect()
.forEach(0, e -> sub.onNext(e),t->sub.onError(t),()->sub.onComplete());
return sub.reactiveStream();
}
代码示例来源:origin: aol/cyclops
static ReactiveSeq<Integer> interval(String cron,ScheduledExecutorService exec) {
ReactiveSubscriber<Integer> sub = reactiveSubscriber();
AtomicBoolean isOpen = new AtomicBoolean(true);
Subscription[] s= {null};
sub.onSubscribe(new Subscription() {
@Override
public void request(long n) {
s[0].request(n);
}
@Override
public void cancel() {
isOpen.set(false);
}
});
s[0] = ReactiveSeq.iterate(1, a -> a + 1)
.takeWhile(e -> isOpen.get())
.schedule(cron, exec)
.connect()
.forEach(1, e -> sub.onNext(e));
return sub.reactiveStream();
}
static ReactiveSeq<Integer> interval(final long millis,ScheduledExecutorService exec) {
代码示例来源:origin: aol/cyclops
@Test
public void backpressureScheduledCron(){
captured= "";
diff = System.currentTimeMillis();
LinkedBlockingQueue<String> blockingQueue = new LinkedBlockingQueue<String>(1);
blockingQueue.add("10");
blockingQueue.offer("10");
ReactiveSeq.range(0, Integer.MAX_VALUE)
.limit(2)
.peek(v-> diff = System.currentTimeMillis())
.map(i -> i.toString())
.schedule("* * * * * ?", scheduled)
.connect(blockingQueue)
.onePer(2, TimeUnit.SECONDS)
.peek(i->System.out.println("BQ " + blockingQueue))
.peek(System.out::println)
.forEach(c->captured=c);
assertThat(System.currentTimeMillis() - diff,greaterThan(1500l));
}
@Test
代码示例来源:origin: aol/cyclops
@Test
public void backpressureScheduledCron(){
captured= "";
diff = System.currentTimeMillis();
LinkedBlockingQueue<String> blockingQueue = new LinkedBlockingQueue<String>(1);
blockingQueue.add("10");
blockingQueue.offer("10");
Spouts.range(0, Integer.MAX_VALUE)
.limit(2)
.peek(v-> diff = System.currentTimeMillis())
.map(i -> i.toString())
.schedule("* * * * * ?", scheduled)
.connect(blockingQueue)
.onePer(2, TimeUnit.SECONDS)
.peek(i->System.out.println("BQ " + blockingQueue))
.peek(System.out::println)
.forEach(c->captured=c);
assertThat(System.currentTimeMillis() - diff,greaterThan(995l));
}
@Test
代码示例来源:origin: aol/cyclops
@Test
public void reactiveSeq(){
Connectable<String> connectable = ReactiveSeq.of("a", "b", "c", "d", "e")
.peek(x -> System.out.println("peek1:" + x))
.schedule("* * * * * ?", ThreadPools.getStandardSchedular());
System.out.println("resultList:" + connectable.connect().debounce(10, TimeUnit.SECONDS).peek(x->System.out.println("peek2:" + x)).toList() );
}
代码示例来源:origin: aol/cyclops
@Test
public void cronDebounceTest() throws InterruptedException{
assertThat(of(1,2,3,4)
.peek(i->count.incrementAndGet())
.peek(System.out::println)
.schedule("* * * * * ?", ex)
.connect()
.debounce(1,TimeUnit.DAYS)
.peek(System.out::println)
.toList(),equalTo(Arrays.asList(1)));
}
@Test
代码示例来源:origin: aol/cyclops
@Test
public void cronDebounceTest() throws InterruptedException{
assertThat(ReactiveSeq.of(1,2,3,4)
.peek(i->count.incrementAndGet())
.peek(System.out::println)
.schedule("* * * * * ?", ex)
.connect()
.debounce(1,TimeUnit.DAYS)
.peek(System.out::println)
.toList(),equalTo(Arrays.asList(1)));
}
@Test
代码示例来源:origin: aol/cyclops
@Test
public void cronDebounceTest() throws InterruptedException{
assertThat(of(1,2,3,4)
.peek(i->count.incrementAndGet())
.peek(System.out::println)
.schedule("* * * * * ?", ex)
.connect()
.debounce(1,TimeUnit.DAYS)
.peek(System.out::println)
.toList(),equalTo(Arrays.asList(1)));
}
@Test
代码示例来源:origin: aol/cyclops
@Test @Ignore
public void backpressureScheduledCron(){
captured= "";
diff = System.currentTimeMillis();
LinkedBlockingQueue<String> blockingQueue = new LinkedBlockingQueue<String>(1);
blockingQueue.add("10");
blockingQueue.offer("10");
range(0, Integer.MAX_VALUE)
.limit(2)
.peek(v-> diff = System.currentTimeMillis())
.map(i -> i.toString())
.schedule("* * * * * ?", scheduled)
.connect(blockingQueue)
.onePer(2, TimeUnit.SECONDS)
.peek(i->System.out.println("BQ " + blockingQueue))
.peek(System.out::println)
.forEach(c->captured=c);
assertThat(System.currentTimeMillis() - diff,greaterThan(1500l));
}
@Test @Ignore
代码示例来源:origin: aol/cyclops
@Test
public void cronDebounceTest() throws InterruptedException{
assertThat(of(1,2,3,4)
.peek(i->count.incrementAndGet())
.peek(System.out::println)
.schedule("* * * * * ?", ex)
.connect()
.debounce(1,TimeUnit.DAYS)
.peek(System.out::println)
.toList(),equalTo(Arrays.asList(1)));
}
@Test
代码示例来源:origin: aol/cyclops
@Test
public void cronDebounceTest() throws InterruptedException{
assertThat(Spouts.of(1,2,3,4)
.peek(i->count.incrementAndGet())
.peek(System.out::println)
.schedule("* * * * * ?", ex)
.connect()
.debounce(1,TimeUnit.DAYS)
.peek(System.out::println)
.toList(),equalTo(Arrays.asList(1)));
}
@Test
内容来源于网络,如有侵权,请联系作者删除!