cyclops.reactive.ReactiveSeq.spliterator()方法的使用及代码示例

x33g5p2x  于2022-01-29 转载在 其他  
字(7.2k)|赞(0)|评价(0)|浏览(110)

本文整理了Java中cyclops.reactive.ReactiveSeq.spliterator方法的一些代码示例,展示了ReactiveSeq.spliterator的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。ReactiveSeq.spliterator方法的具体详情如下:
包路径:cyclops.reactive.ReactiveSeq
类名称:ReactiveSeq
方法名:spliterator

ReactiveSeq.spliterator介绍

[英]Returns a spliterator for the elements of this stream.

This is a terminal operation.
[中]返回此流元素的拆分器。
这是一个terminal operation

代码示例

代码示例来源:origin: aol/cyclops

@Override
default LongStream mapToLong(ToLongFunction<? super T> fn){
  Spliterator<T> split = this.spliterator();
  return (split instanceof Spliterator.OfLong)? StreamSupport.longStream((Spliterator.OfLong)split,false) : StreamSupport.stream(split,false).mapToLong(fn);
}

代码示例来源:origin: aol/cyclops

@Override
default DoubleStream mapToDouble(ToDoubleFunction<? super T> fn){
  Spliterator<T> split = this.spliterator();
  return (split instanceof Spliterator.OfDouble) ? StreamSupport.doubleStream((Spliterator.OfDouble)split,false) : StreamSupport.stream(split,false).mapToDouble(fn);
}

代码示例来源:origin: aol/cyclops

@Override
default IntStream mapToInt(ToIntFunction<? super T> fn){
  Spliterator<T> split = this.spliterator();
  IntStream s = (split instanceof Spliterator.OfInt)? StreamSupport.intStream((Spliterator.OfInt)split,false) : StreamSupport.stream(split,false).mapToInt(fn);
  return s;
}

代码示例来源:origin: aol/cyclops

@Override
public ReactiveSeq<T> prependAll(final T... other) {
  return ReactiveSeq.concat(ReactiveSeq.of(other).spliterator(),get());
}

代码示例来源:origin: aol/cyclops

default <R> R foldParallel(Function<? super Stream<T>,? extends R> fn){
  Queue<T> queue = QueueFactories.<T>unboundedNonBlockingQueue().build().withTimeout(1);
  AtomicReference<Continuation> ref = new AtomicReference<>(null);
  Continuation cont =
      new Continuation(()->{
        if(ref.get()==null && ref.compareAndSet(null,Continuation.empty())){
          try {
            //use the first consuming thread to tell this Stream onto the Queue
            this.spliterator().forEachRemaining(queue::offer);
          }finally {
            queue.close();
          }
        }
          return Continuation.empty();
        });
  ;
  queue.addContinuation(cont);
  return fn.apply(queue.jdkStream().parallel());
}
default <R> R foldParallel(ForkJoinPool fj,Function<? super Stream<T>,? extends R> fn){

代码示例来源:origin: aol/cyclops

@Test
public void cancel(){
  Spouts.fromSpliterator(ReactiveSeq.iterate(0l,i->i+1l).spliterator())
      .limit(5)
      .filter(i->true).subscribe(new Subscriber<Long>() {

代码示例来源:origin: aol/cyclops

@Test
public void request1(){
  new SpliteratorToOperator<Integer>(ReactiveSeq.of(1,2,3).spliterator())
      .subscribe(values::add,errors::add,()->onComplete=true)
  .request(1l);
  assertThat(values.size(),equalTo(1));
}

代码示例来源:origin: aol/cyclops

@Test
public void cancel2(){
  Spouts.fromSpliterator(ReactiveSeq.iterate(0l,i->i+1l).spliterator())
      .limit(3)
      .filter(i->true).subscribe(new Subscriber<Long>() {

代码示例来源:origin: aol/cyclops

default Topic<T> broadcast(){
  Queue<T> queue = QueueFactories.<T>unboundedNonBlockingQueue()
                        .build()
                        .withTimeout(1);
  Topic<T> topic = new Topic<T>(queue,QueueFactories.<T>unboundedNonBlockingQueue());
  AtomicBoolean wip = new AtomicBoolean(false);
  Spliterator<T> split = this.spliterator();
  Continuation ref[] = {null};
  Continuation cont =
      new Continuation(()->{
        if(wip.compareAndSet(false,true)){
          try {
            //use the first consuming thread to tell this Stream onto the Queue
            if(!split.tryAdvance(topic::offer)){
              topic.close();
              return Continuation.empty();
            }
          }finally {
            wip.set(false);
          }
        }
        return ref[0];
      });
  ref[0]=cont;
  queue.addContinuation(cont);
  return topic;
}

代码示例来源:origin: aol/cyclops

@Test
public void requestTwo(){
  new SpliteratorToOperator<Integer>(ReactiveSeq.fill(10).limit(100).spliterator())
      .subscribe(values::add,errors::add,()->onComplete=true)
      .request(2l);
  assertThat(values.size(),equalTo(2));
}

代码示例来源:origin: aol/cyclops

@Test
public void requestOne(){
  Subscription sub = new FilterOperator<>(new SpliteratorToOperator<Integer>(ReactiveSeq.fill(10).limit(100).spliterator()),
      i->true)
      .subscribe(values::add,errors::add,()->onComplete=true);
  sub.request(1l);
  assertThat(values.size(),equalTo(1));
  assertFalse(onComplete);
  sub.cancel();
  sub.request(1l);
  assertThat(values.size(),equalTo(1));
  assertFalse(onComplete);
}
@Test

代码示例来源:origin: aol/cyclops

@Override
public Publisher<Long> createPublisher(long elements) {
  return Spouts.fromSpliterator(ReactiveSeq.iterate(0l,i->i+1l).spliterator())
      .limit(elements)
      .filter(i->true);
}

代码示例来源:origin: aol/cyclops

@Test
public void replayStream(){
  ReactiveSeq<String> stream = of("hello","world");
  ReactiveSeq<String> stream1 = stream.map(str->"hello world " + str);
  Spliterator<String> sp = stream1.spliterator();
  ReactiveSeq.fromSpliterator(sp).forEach(System.out::println);
  ReactiveSeq.fromSpliterator(sp).forEach(System.err::println);
}
@Test

代码示例来源:origin: com.oath.cyclops/cyclops

@Override
default DoubleStream mapToDouble(ToDoubleFunction<? super T> fn){
  Spliterator<T> split = this.spliterator();
  return (split instanceof Spliterator.OfDouble) ? StreamSupport.doubleStream((Spliterator.OfDouble)split,false) : StreamSupport.stream(split,false).mapToDouble(fn);
}

代码示例来源:origin: com.oath.cyclops/cyclops

@Override
default IntStream mapToInt(ToIntFunction<? super T> fn){
  Spliterator<T> split = this.spliterator();
  IntStream s = (split instanceof Spliterator.OfInt)? StreamSupport.intStream((Spliterator.OfInt)split,false) : StreamSupport.stream(split,false).mapToInt(fn);
  return s;
}

代码示例来源:origin: com.oath.cyclops/cyclops

@Override
default LongStream mapToLong(ToLongFunction<? super T> fn){
  Spliterator<T> split = this.spliterator();
  return (split instanceof Spliterator.OfLong)? StreamSupport.longStream((Spliterator.OfLong)split,false) : StreamSupport.stream(split,false).mapToLong(fn);
}

代码示例来源:origin: com.oath.cyclops/cyclops-futurestream

@Override
default Spliterator<U> spliterator() {
  return stream().spliterator();
}

代码示例来源:origin: aol/cyclops

@Test
public void replay(){
  ReactiveSubscriber<String> pushable = Spouts.reactiveSubscriber();
  ReactiveSeq<String> stream = pushable.reactiveStream();
  ReactiveSeq<String> stream1 = stream.map(str->"hello world " + str);
  Spliterator<String> sp = stream1.spliterator();
  pushable.onNext("hello");
  pushable.onComplete();
  ReactiveSeq.fromSpliterator(sp).forEach(System.out::println);
  pushable.onNext("world");
  pushable.onComplete();
  ReactiveSeq.fromSpliterator(sp).forEach(System.err::println);
}
@Test

代码示例来源:origin: com.oath.cyclops/cyclops

@Override
public ReactiveSeq<T> prependAll(final T... other) {
  return ReactiveSeq.concat(ReactiveSeq.of(other).spliterator(),get());
}

代码示例来源:origin: com.oath.cyclops/cyclops

default <R> R foldParallel(Function<? super Stream<T>,? extends R> fn){
  Queue<T> queue = QueueFactories.<T>unboundedNonBlockingQueue().build().withTimeout(1);
  AtomicReference<Continuation> ref = new AtomicReference<>(null);
  Continuation cont =
      new Continuation(()->{
        if(ref.get()==null && ref.compareAndSet(null,Continuation.empty())){
          try {
            //use the first consuming thread to tell this Stream onto the Queue
            this.spliterator().forEachRemaining(queue::offer);
          }finally {
            queue.close();
          }
        }
          return Continuation.empty();
        });
  ;
  queue.addContinuation(cont);
  return fn.apply(queue.jdkStream().parallel());
}
default <R> R foldParallel(ForkJoinPool fj,Function<? super Stream<T>,? extends R> fn){

相关文章

微信公众号

最新文章

更多

ReactiveSeq类方法