本文整理了Java中cyclops.reactive.ReactiveSeq.scheduleFixedDelay
方法的一些代码示例,展示了ReactiveSeq.scheduleFixedDelay
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。ReactiveSeq.scheduleFixedDelay
方法的具体详情如下:
包路径:cyclops.reactive.ReactiveSeq
类名称:ReactiveSeq
方法名:scheduleFixedDelay
[英]Execute this Stream on a schedule
//run every 60 seconds after last job completes
Connect to the Scheduled Stream
Connectable dataStream = ReactiveSeq.generate(() -> "next job:" + formatDate(new Date())).map(this::processJob)
[中]按计划执行此流
//run every 60 seconds after last job completes
连接到计划流
Connectable dataStream = ReactiveSeq.generate(() -> "next job:" + formatDate(new Date())).map(this::processJob)
代码示例来源:origin: aol/cyclops
.scheduleFixedDelay(delay, ex);
代码示例来源:origin: aol/cyclops
static ReactiveSeq<Integer> interval(final long millis,ScheduledExecutorService exec) {
ReactiveSubscriber<Integer> sub = reactiveSubscriber();
AtomicBoolean isOpen = new AtomicBoolean(true);
Subscription[] s= {null};
sub.onSubscribe(new Subscription() {
@Override
public void request(long n) {
s[0].request(n);
}
@Override
public void cancel() {
isOpen.set(false);
}
});
s[0] = ReactiveSeq.iterate(1, a -> a + 1)
.takeWhile(e -> isOpen.get())
.scheduleFixedDelay(millis, exec)
.connect()
.forEach(1, e -> sub.onNext(e));
return sub.reactiveStream();
}
static <T> ReactiveSeq<T> schedule(final Stream<T> stream,final String cron,final ScheduledExecutorService exec) {
代码示例来源:origin: aol/cyclops
@Test
public void backpressureScheduledDelay(){
captured= "";
diff = System.currentTimeMillis();
LinkedBlockingQueue<String> blockingQueue = new LinkedBlockingQueue<String>(1);
blockingQueue.add("10");
blockingQueue.offer("10");
ReactiveSeq.range(0, Integer.MAX_VALUE)
.limit(2)
.peek(v-> diff = System.currentTimeMillis())
.peek(i->System.out.println("diff is " +diff))
.map(i -> i.toString())
.scheduleFixedDelay(1l, scheduled)
.connect(blockingQueue)
.onePer(1, TimeUnit.SECONDS)
.peek(i->System.out.println("BQ " + blockingQueue))
.peek(System.out::println)
.forEach(c->captured=c);
assertThat(System.currentTimeMillis() - diff,greaterThan(1500l));
}
@Test
代码示例来源:origin: aol/cyclops
@Test
public void backpressureScheduledDelay(){
captured= "";
diff = System.currentTimeMillis();
LinkedBlockingQueue<String> blockingQueue = new LinkedBlockingQueue<String>(1);
blockingQueue.add("10");
blockingQueue.offer("10");
Spouts.range(0, Integer.MAX_VALUE)
.limit(2)
.peek(v-> diff = System.currentTimeMillis())
.peek(i->System.out.println("diff is " +diff))
.map(i -> i.toString())
.scheduleFixedDelay(1l, scheduled)
.connect(blockingQueue)
.onePer(1, TimeUnit.SECONDS)
.peek(i->System.out.println("BQ " + blockingQueue))
.peek(System.out::println)
.forEach(c->captured=c);
assertThat(System.currentTimeMillis() - diff,greaterThan(995l));
}
@Test
代码示例来源:origin: aol/cyclops
@Test @Ignore
public void backpressureScheduledDelay(){
captured= "";
diff = System.currentTimeMillis();
LinkedBlockingQueue<String> blockingQueue = new LinkedBlockingQueue<String>(1);
blockingQueue.add("10");
blockingQueue.offer("10");
range(0, Integer.MAX_VALUE)
.limit(2)
.peek(v-> diff = System.currentTimeMillis())
.peek(i->System.out.println("diff is " +diff))
.map(i -> i.toString())
.scheduleFixedDelay(1l, scheduled)
.connect(blockingQueue)
.onePer(1, TimeUnit.SECONDS)
.peek(i->System.out.println("BQ " + blockingQueue))
.peek(System.out::println)
.forEach(c->captured=c);
assertThat(System.currentTimeMillis() - diff,greaterThan(1500l));
}
@Test @Ignore
代码示例来源:origin: aol/cyclops
@Test
public void fixedRateDelay() throws InterruptedException{
assertThat(of(1,2,3,4)
.peek(i->count.incrementAndGet())
.peek(System.out::println)
.scheduleFixedDelay(1000, ex)
.connect()
.debounce(1,TimeUnit.DAYS)
.peek(System.out::println)
.toList(),equalTo(Arrays.asList(1)));
}
}
代码示例来源:origin: aol/cyclops
@Test
public void fixedRateDelay() throws InterruptedException{
assertThat(of(1,2,3,4)
.peek(i->count.incrementAndGet())
.peek(System.out::println)
.scheduleFixedDelay(1000, ex)
.connect()
.debounce(1,TimeUnit.DAYS)
.peek(System.out::println)
.toList(),equalTo(Arrays.asList(1)));
}
}
代码示例来源:origin: aol/cyclops
@Test
public void fixedRateDelay() throws InterruptedException{
assertThat(of(1,2,3,4)
.peek(i->count.incrementAndGet())
.peek(System.out::println)
.scheduleFixedDelay(1000, ex)
.connect()
.debounce(1,TimeUnit.DAYS)
.peek(System.out::println)
.toList(),equalTo(Arrays.asList(1)));
}
}
代码示例来源:origin: aol/cyclops
@Test
public void fixedRateDelay() throws InterruptedException{
assertThat(Spouts.of(1,2,3,4)
.peek(i->count.incrementAndGet())
.peek(System.out::println)
.scheduleFixedDelay(1000, ex)
.connect()
.debounce(1,TimeUnit.DAYS)
.peek(System.out::println)
.toList(),equalTo(Arrays.asList(1)));
}
}
代码示例来源:origin: aol/cyclops
@Test
public void fixedRateDelay() throws InterruptedException{
assertThat(ReactiveSeq.of(1,2,3,4)
.peek(i->count.incrementAndGet())
.peek(System.out::println)
.scheduleFixedDelay(1000, ex)
.connect()
.debounce(1,TimeUnit.DAYS)
.peek(System.out::println)
.toList(),equalTo(Arrays.asList(1)));
}
}
代码示例来源:origin: aol/cyclops
@Test
public void backpressureScheduledDelayNonBlocking(){
captured= "";
diff = System.currentTimeMillis();
Queue<String> blockingQueue = new ManyToOneConcurrentArrayQueue<String>(1);
ReactiveSeq.range(0, Integer.MAX_VALUE)
.limit(3)
.peek(i->System.out.println("diff before is " +diff))
.peek(v-> diff = System.currentTimeMillis()-diff)
.peek(i->System.out.println("diff is " +diff))
.map(i -> i.toString())
.scheduleFixedDelay(1l, scheduled)
.connect(blockingQueue)
.onePer(1, TimeUnit.SECONDS)
.peek(i->System.out.println("BQ " + blockingQueue))
.peek(System.out::println)
.forEach(c->captured=c);
assertThat(diff,lessThan(500l));
}
@Test
代码示例来源:origin: aol/cyclops
@Test
public void backpressureScheduledDelayNonBlocking(){
captured= "";
diff = System.currentTimeMillis();
Queue<String> blockingQueue = new ManyToOneConcurrentArrayQueue<String>(1);
Spouts.range(0, Integer.MAX_VALUE)
.limit(3)
.peek(i->System.out.println("diff before is " +diff))
.peek(v-> diff = System.currentTimeMillis()-diff)
.peek(i->System.out.println("diff is " +diff))
.map(i -> i.toString())
.scheduleFixedDelay(1l, scheduled)
.connect(blockingQueue)
.onePer(1, TimeUnit.SECONDS)
.peek(i->System.out.println("BQ " + blockingQueue))
.peek(System.out::println)
.forEach(c->captured=c);
assertThat(diff,lessThan(500l));
}
@Test
代码示例来源:origin: aol/cyclops
@Test @Ignore
public void backpressureScheduledDelayNonBlocking(){
captured= "";
diff = System.currentTimeMillis();
Queue<String> blockingQueue = new ManyToOneConcurrentArrayQueue<String>(1);
range(0, Integer.MAX_VALUE)
.limit(3)
.peek(i->System.out.println("diff before is " +diff))
.peek(v-> diff = System.currentTimeMillis()-diff)
.peek(i->System.out.println("diff is " +diff))
.map(i -> i.toString())
.scheduleFixedDelay(1l, scheduled)
.connect(blockingQueue)
.onePer(1, TimeUnit.SECONDS)
.peek(i->System.out.println("BQ " + blockingQueue))
.peek(System.out::println)
.forEach(c->captured=c);
assertThat(diff,lessThan(500l));
}
@Test @Ignore
代码示例来源:origin: com.oath.cyclops/cyclops-futurestream
@Override
public Connectable<U> scheduleFixedDelay(final long delay, final ScheduledExecutorService ex) {
return ReactiveSeq.<U> fromStream(this.stream())
.scheduleFixedDelay(delay, ex);
}
代码示例来源:origin: com.oath.cyclops/cyclops
.scheduleFixedDelay(delay, ex);
代码示例来源:origin: com.oath.cyclops/cyclops
static ReactiveSeq<Integer> interval(final long millis,ScheduledExecutorService exec) {
ReactiveSubscriber<Integer> sub = reactiveSubscriber();
AtomicBoolean isOpen = new AtomicBoolean(true);
Subscription[] s= {null};
sub.onSubscribe(new Subscription() {
@Override
public void request(long n) {
s[0].request(n);
}
@Override
public void cancel() {
isOpen.set(false);
}
});
s[0] = ReactiveSeq.iterate(1, a -> a + 1)
.takeWhile(e -> isOpen.get())
.scheduleFixedDelay(millis, exec)
.connect()
.forEach(1, e -> sub.onNext(e));
return sub.reactiveStream();
}
static <T> ReactiveSeq<T> schedule(final Stream<T> stream,final String cron,final ScheduledExecutorService exec) {
内容来源于网络,如有侵权,请联系作者删除!