本文整理了Java中cyclops.reactive.ReactiveSeq.spliterator
方法的一些代码示例,展示了ReactiveSeq.spliterator
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。ReactiveSeq.spliterator
方法的具体详情如下:
包路径:cyclops.reactive.ReactiveSeq
类名称:ReactiveSeq
方法名:spliterator
[英]Returns a spliterator for the elements of this stream.
This is a terminal operation.
[中]返回此流元素的拆分器。
这是一个terminal operation。
代码示例来源:origin: aol/cyclops
@Override
default LongStream mapToLong(ToLongFunction<? super T> fn){
Spliterator<T> split = this.spliterator();
return (split instanceof Spliterator.OfLong)? StreamSupport.longStream((Spliterator.OfLong)split,false) : StreamSupport.stream(split,false).mapToLong(fn);
}
代码示例来源:origin: aol/cyclops
@Override
default DoubleStream mapToDouble(ToDoubleFunction<? super T> fn){
Spliterator<T> split = this.spliterator();
return (split instanceof Spliterator.OfDouble) ? StreamSupport.doubleStream((Spliterator.OfDouble)split,false) : StreamSupport.stream(split,false).mapToDouble(fn);
}
代码示例来源:origin: aol/cyclops
@Override
default IntStream mapToInt(ToIntFunction<? super T> fn){
Spliterator<T> split = this.spliterator();
IntStream s = (split instanceof Spliterator.OfInt)? StreamSupport.intStream((Spliterator.OfInt)split,false) : StreamSupport.stream(split,false).mapToInt(fn);
return s;
}
代码示例来源:origin: aol/cyclops
@Override
public ReactiveSeq<T> prependAll(final T... other) {
return ReactiveSeq.concat(ReactiveSeq.of(other).spliterator(),get());
}
代码示例来源:origin: aol/cyclops
default <R> R foldParallel(Function<? super Stream<T>,? extends R> fn){
Queue<T> queue = QueueFactories.<T>unboundedNonBlockingQueue().build().withTimeout(1);
AtomicReference<Continuation> ref = new AtomicReference<>(null);
Continuation cont =
new Continuation(()->{
if(ref.get()==null && ref.compareAndSet(null,Continuation.empty())){
try {
//use the first consuming thread to tell this Stream onto the Queue
this.spliterator().forEachRemaining(queue::offer);
}finally {
queue.close();
}
}
return Continuation.empty();
});
;
queue.addContinuation(cont);
return fn.apply(queue.jdkStream().parallel());
}
default <R> R foldParallel(ForkJoinPool fj,Function<? super Stream<T>,? extends R> fn){
代码示例来源:origin: aol/cyclops
@Test
public void cancel(){
Spouts.fromSpliterator(ReactiveSeq.iterate(0l,i->i+1l).spliterator())
.limit(5)
.filter(i->true).subscribe(new Subscriber<Long>() {
代码示例来源:origin: aol/cyclops
@Test
public void request1(){
new SpliteratorToOperator<Integer>(ReactiveSeq.of(1,2,3).spliterator())
.subscribe(values::add,errors::add,()->onComplete=true)
.request(1l);
assertThat(values.size(),equalTo(1));
}
代码示例来源:origin: aol/cyclops
@Test
public void cancel2(){
Spouts.fromSpliterator(ReactiveSeq.iterate(0l,i->i+1l).spliterator())
.limit(3)
.filter(i->true).subscribe(new Subscriber<Long>() {
代码示例来源:origin: aol/cyclops
default Topic<T> broadcast(){
Queue<T> queue = QueueFactories.<T>unboundedNonBlockingQueue()
.build()
.withTimeout(1);
Topic<T> topic = new Topic<T>(queue,QueueFactories.<T>unboundedNonBlockingQueue());
AtomicBoolean wip = new AtomicBoolean(false);
Spliterator<T> split = this.spliterator();
Continuation ref[] = {null};
Continuation cont =
new Continuation(()->{
if(wip.compareAndSet(false,true)){
try {
//use the first consuming thread to tell this Stream onto the Queue
if(!split.tryAdvance(topic::offer)){
topic.close();
return Continuation.empty();
}
}finally {
wip.set(false);
}
}
return ref[0];
});
ref[0]=cont;
queue.addContinuation(cont);
return topic;
}
代码示例来源:origin: aol/cyclops
@Test
public void requestTwo(){
new SpliteratorToOperator<Integer>(ReactiveSeq.fill(10).limit(100).spliterator())
.subscribe(values::add,errors::add,()->onComplete=true)
.request(2l);
assertThat(values.size(),equalTo(2));
}
代码示例来源:origin: aol/cyclops
@Test
public void requestOne(){
Subscription sub = new FilterOperator<>(new SpliteratorToOperator<Integer>(ReactiveSeq.fill(10).limit(100).spliterator()),
i->true)
.subscribe(values::add,errors::add,()->onComplete=true);
sub.request(1l);
assertThat(values.size(),equalTo(1));
assertFalse(onComplete);
sub.cancel();
sub.request(1l);
assertThat(values.size(),equalTo(1));
assertFalse(onComplete);
}
@Test
代码示例来源:origin: aol/cyclops
@Override
public Publisher<Long> createPublisher(long elements) {
return Spouts.fromSpliterator(ReactiveSeq.iterate(0l,i->i+1l).spliterator())
.limit(elements)
.filter(i->true);
}
代码示例来源:origin: aol/cyclops
@Test
public void replayStream(){
ReactiveSeq<String> stream = of("hello","world");
ReactiveSeq<String> stream1 = stream.map(str->"hello world " + str);
Spliterator<String> sp = stream1.spliterator();
ReactiveSeq.fromSpliterator(sp).forEach(System.out::println);
ReactiveSeq.fromSpliterator(sp).forEach(System.err::println);
}
@Test
代码示例来源:origin: com.oath.cyclops/cyclops
@Override
default DoubleStream mapToDouble(ToDoubleFunction<? super T> fn){
Spliterator<T> split = this.spliterator();
return (split instanceof Spliterator.OfDouble) ? StreamSupport.doubleStream((Spliterator.OfDouble)split,false) : StreamSupport.stream(split,false).mapToDouble(fn);
}
代码示例来源:origin: com.oath.cyclops/cyclops
@Override
default IntStream mapToInt(ToIntFunction<? super T> fn){
Spliterator<T> split = this.spliterator();
IntStream s = (split instanceof Spliterator.OfInt)? StreamSupport.intStream((Spliterator.OfInt)split,false) : StreamSupport.stream(split,false).mapToInt(fn);
return s;
}
代码示例来源:origin: com.oath.cyclops/cyclops
@Override
default LongStream mapToLong(ToLongFunction<? super T> fn){
Spliterator<T> split = this.spliterator();
return (split instanceof Spliterator.OfLong)? StreamSupport.longStream((Spliterator.OfLong)split,false) : StreamSupport.stream(split,false).mapToLong(fn);
}
代码示例来源:origin: com.oath.cyclops/cyclops-futurestream
@Override
default Spliterator<U> spliterator() {
return stream().spliterator();
}
代码示例来源:origin: aol/cyclops
@Test
public void replay(){
ReactiveSubscriber<String> pushable = Spouts.reactiveSubscriber();
ReactiveSeq<String> stream = pushable.reactiveStream();
ReactiveSeq<String> stream1 = stream.map(str->"hello world " + str);
Spliterator<String> sp = stream1.spliterator();
pushable.onNext("hello");
pushable.onComplete();
ReactiveSeq.fromSpliterator(sp).forEach(System.out::println);
pushable.onNext("world");
pushable.onComplete();
ReactiveSeq.fromSpliterator(sp).forEach(System.err::println);
}
@Test
代码示例来源:origin: com.oath.cyclops/cyclops
@Override
public ReactiveSeq<T> prependAll(final T... other) {
return ReactiveSeq.concat(ReactiveSeq.of(other).spliterator(),get());
}
代码示例来源:origin: com.oath.cyclops/cyclops
default <R> R foldParallel(Function<? super Stream<T>,? extends R> fn){
Queue<T> queue = QueueFactories.<T>unboundedNonBlockingQueue().build().withTimeout(1);
AtomicReference<Continuation> ref = new AtomicReference<>(null);
Continuation cont =
new Continuation(()->{
if(ref.get()==null && ref.compareAndSet(null,Continuation.empty())){
try {
//use the first consuming thread to tell this Stream onto the Queue
this.spliterator().forEachRemaining(queue::offer);
}finally {
queue.close();
}
}
return Continuation.empty();
});
;
queue.addContinuation(cont);
return fn.apply(queue.jdkStream().parallel());
}
default <R> R foldParallel(ForkJoinPool fj,Function<? super Stream<T>,? extends R> fn){
内容来源于网络,如有侵权,请联系作者删除!