cyclops.reactive.ReactiveSeq.schedule()方法的使用及代码示例

x33g5p2x  于2022-01-29 转载在 其他  
字(8.1k)|赞(0)|评价(0)|浏览(125)

本文整理了Java中cyclops.reactive.ReactiveSeq.schedule方法的一些代码示例,展示了ReactiveSeq.schedule的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。ReactiveSeq.schedule方法的具体详情如下:
包路径:cyclops.reactive.ReactiveSeq
类名称:ReactiveSeq
方法名:schedule

ReactiveSeq.schedule介绍

[英]Execute this Stream on a schedule

//run at 8PM every night

Connect to the Scheduled Stream

Connectable dataStream = ReactiveSeq.generate(() -> "next job:" + formatDate(new Date())).map(this::processJob)

[中]按计划执行此流

//run at 8PM every night

连接到计划流

Connectable dataStream = ReactiveSeq.generate(() -> "next job:" + formatDate(new Date())).map(this::processJob)

代码示例

代码示例来源:origin: aol/micro-server

public void schedule() {
    cleaner.forEach(cl -> {
      ReactiveSeq.generate(() -> 1)
            .filter(in -> condition.shouldClean())
            .map(i -> cl.scheduleAndLog())
            .peek(sd -> bus.post(sd))
            .schedule(cl.getCron(), executor);
    });
  }
}

代码示例来源:origin: aol/micro-server

public void schedule() {
  loader.forEach(dl -> {
    // run on startup
    create(dl).limit(1).foldFuture(executor, s -> s.forEach(Long.MAX_VALUE, l -> {}));
    // schedule
    create(dl).schedule(dl.getCron(), executor);
  });
}

代码示例来源:origin: aol/cyclops

/**
 * Memoize this function and update cached values on a schedule
 * Does not support null keys
 *
 * @param fn  Function to Memoize
 * @param ex Scheduled Executor Service
 * @param cron Cron expression for updating cached values asynchonrously
 * @param <T> Input Type of Function
 * @param <R> Return type of Function
 * @return Memoized asynchronously updating function
 */
public static <T, R> Function1<T, R> memoizeFunctionAsync(final Function<T, R> fn, ScheduledExecutorService ex, String cron) {
  final Map<T, R> lazy = new ConcurrentHashMap<>();
  ReactiveSeq.generate(()->{
    lazy.forEach((k,v)->{
      lazy.put(k,fn.apply(k));
    });
    return null;
  }).schedule(cron,ex);
  return t -> lazy.computeIfAbsent(t, fn);
}

代码示例来源:origin: aol/cyclops

.schedule(cron, ex);

代码示例来源:origin: aol/cyclops

@Test
public void cronTest() throws InterruptedException{
  of(1,2,3,4)
      .peek(i->count.incrementAndGet())
      .peek(System.out::println)
      .schedule("* * * * * ?", ex);
  
  Thread.sleep(5000);
  
}
@Test

代码示例来源:origin: aol/cyclops

@Test
public void cronTest() throws InterruptedException{
  of(1,2,3,4)
      .peek(i->count.incrementAndGet())
      .peek(System.out::println)
      .schedule("* * * * * ?", ex);
  
  Thread.sleep(5000);
  
}
@Test

代码示例来源:origin: aol/cyclops

@Test
public void cronTest() throws InterruptedException{
  of(1,2,3,4)
      .peek(i->count.incrementAndGet())
      .peek(System.out::println)
      .schedule("* * * * * ?", ex);
  
  Thread.sleep(5000);
  
}
@Test

代码示例来源:origin: aol/cyclops

@Test
public void cronTest() throws InterruptedException{
  ReactiveSeq.of(1,2,3,4)
      .peek(i->count.incrementAndGet())
      .peek(System.out::println)
      .schedule("* * * * * ?", ex);
  Thread.sleep(5000);
}
@Test

代码示例来源:origin: aol/cyclops

@Test
public void cronTest() throws InterruptedException{
  Spouts.of(1,2,3,4)
      .peek(i->count.incrementAndGet())
      .peek(System.out::println)
      .schedule("* * * * * ?", ex);
  
  Thread.sleep(5000);
  
}
@Test

代码示例来源:origin: aol/cyclops

static  <T> ReactiveSeq<T> schedule(final Stream<T> stream,final String cron,final ScheduledExecutorService exec) {
  ReactiveSubscriber<T> sub = reactiveSubscriber();
  AtomicBoolean isOpen = new AtomicBoolean(true);
  Subscription[] s= {null};
  sub.onSubscribe(new Subscription() {
    @Override
    public void request(long n) {
      s[0].request(n);
    }
    @Override
    public void cancel() {
      isOpen.set(false);
    }
  });
  s[0] = ReactiveSeq.fromStream(stream)
            .takeWhile(e -> isOpen.get())
            .schedule(cron, exec)
            .connect()
            .forEach(0, e -> sub.onNext(e),t->sub.onError(t),()->sub.onComplete());
  return sub.reactiveStream();
}

代码示例来源:origin: aol/cyclops

static  ReactiveSeq<Integer> interval(String cron,ScheduledExecutorService exec) {
  ReactiveSubscriber<Integer> sub = reactiveSubscriber();
  AtomicBoolean isOpen = new AtomicBoolean(true);
  Subscription[] s= {null};
  sub.onSubscribe(new Subscription() {
    @Override
    public void request(long n) {
      s[0].request(n);
    }
    @Override
    public void cancel() {
      isOpen.set(false);
    }
  });
  s[0] = ReactiveSeq.iterate(1, a -> a + 1)
           .takeWhile(e -> isOpen.get())
           .schedule(cron, exec)
           .connect()
           .forEach(1, e -> sub.onNext(e));
  return sub.reactiveStream();
}
static  ReactiveSeq<Integer> interval(final  long millis,ScheduledExecutorService exec) {

代码示例来源:origin: aol/cyclops

@Test
public void backpressureScheduledCron(){
  captured= "";
    diff =  System.currentTimeMillis();
   LinkedBlockingQueue<String> blockingQueue = new LinkedBlockingQueue<String>(1);
   blockingQueue.add("10");
   blockingQueue.offer("10");
   ReactiveSeq.range(0, Integer.MAX_VALUE)
     .limit(2)
     .peek(v-> diff = System.currentTimeMillis())
     .map(i -> i.toString())
     .schedule("* * * * * ?", scheduled)
     .connect(blockingQueue)
     .onePer(2, TimeUnit.SECONDS)
     .peek(i->System.out.println("BQ " + blockingQueue))
     .peek(System.out::println)
     .forEach(c->captured=c);
   assertThat(System.currentTimeMillis() - diff,greaterThan(1500l));
}
@Test

代码示例来源:origin: aol/cyclops

@Test
public void backpressureScheduledCron(){
  captured= "";
    diff =  System.currentTimeMillis();
   LinkedBlockingQueue<String> blockingQueue = new LinkedBlockingQueue<String>(1);
   blockingQueue.add("10");
   blockingQueue.offer("10");
   Spouts.range(0, Integer.MAX_VALUE)
     .limit(2)
     .peek(v-> diff = System.currentTimeMillis())
     .map(i -> i.toString())
     .schedule("* * * * * ?", scheduled)
     .connect(blockingQueue)
     .onePer(2, TimeUnit.SECONDS)
     .peek(i->System.out.println("BQ " + blockingQueue))
     .peek(System.out::println)
     .forEach(c->captured=c);
   assertThat(System.currentTimeMillis() - diff,greaterThan(995l));
}
@Test

代码示例来源:origin: aol/cyclops

@Test
public void reactiveSeq(){
  Connectable<String> connectable = ReactiveSeq.of("a", "b", "c", "d", "e")
      .peek(x -> System.out.println("peek1:" + x))
      .schedule("* * * * * ?", ThreadPools.getStandardSchedular());
System.out.println("resultList:" + connectable.connect().debounce(10, TimeUnit.SECONDS).peek(x->System.out.println("peek2:" + x)).toList() );
}

代码示例来源:origin: aol/cyclops

@Test
public void cronDebounceTest() throws InterruptedException{
  assertThat(of(1,2,3,4)
      .peek(i->count.incrementAndGet())
      .peek(System.out::println)
      .schedule("* * * * * ?", ex)
      .connect()
      .debounce(1,TimeUnit.DAYS)
      .peek(System.out::println)
      .toList(),equalTo(Arrays.asList(1)));
  
  
}
@Test

代码示例来源:origin: aol/cyclops

@Test
public void cronDebounceTest() throws InterruptedException{
  assertThat(ReactiveSeq.of(1,2,3,4)
      .peek(i->count.incrementAndGet())
      .peek(System.out::println)
      .schedule("* * * * * ?", ex)
      .connect()
      .debounce(1,TimeUnit.DAYS)
      .peek(System.out::println)
      .toList(),equalTo(Arrays.asList(1)));
}
@Test

代码示例来源:origin: aol/cyclops

@Test
public void cronDebounceTest() throws InterruptedException{
  assertThat(of(1,2,3,4)
      .peek(i->count.incrementAndGet())
      .peek(System.out::println)
      .schedule("* * * * * ?", ex)
      .connect()
      .debounce(1,TimeUnit.DAYS)
      .peek(System.out::println)
      .toList(),equalTo(Arrays.asList(1)));
  
  
}
@Test

代码示例来源:origin: aol/cyclops

@Test @Ignore
public void backpressureScheduledCron(){
  captured= "";
    diff =  System.currentTimeMillis();
   LinkedBlockingQueue<String> blockingQueue = new LinkedBlockingQueue<String>(1);
   blockingQueue.add("10");
   blockingQueue.offer("10");
   range(0, Integer.MAX_VALUE)
     .limit(2)
     .peek(v-> diff = System.currentTimeMillis())
     .map(i -> i.toString())
     .schedule("* * * * * ?", scheduled)
     .connect(blockingQueue)
     .onePer(2, TimeUnit.SECONDS)
     .peek(i->System.out.println("BQ " + blockingQueue))
     .peek(System.out::println)
     .forEach(c->captured=c);
   assertThat(System.currentTimeMillis() - diff,greaterThan(1500l));
}
@Test @Ignore

代码示例来源:origin: aol/cyclops

@Test
public void cronDebounceTest() throws InterruptedException{
  assertThat(of(1,2,3,4)
      .peek(i->count.incrementAndGet())
      .peek(System.out::println)
      .schedule("* * * * * ?", ex)
      .connect()
      .debounce(1,TimeUnit.DAYS)
      .peek(System.out::println)
      .toList(),equalTo(Arrays.asList(1)));
  
  
}
@Test

代码示例来源:origin: aol/cyclops

@Test
public void cronDebounceTest() throws InterruptedException{
  assertThat(Spouts.of(1,2,3,4)
      .peek(i->count.incrementAndGet())
      .peek(System.out::println)
      .schedule("* * * * * ?", ex)
      .connect()
      .debounce(1,TimeUnit.DAYS)
      .peek(System.out::println)
      .toList(),equalTo(Arrays.asList(1)));
  
  
}
@Test

相关文章

微信公众号

最新文章

更多

ReactiveSeq类方法