本文整理了Java中cyclops.reactive.ReactiveSeq.multicast
方法的一些代码示例,展示了ReactiveSeq.multicast
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。ReactiveSeq.multicast
方法的具体详情如下:
包路径:cyclops.reactive.ReactiveSeq
类名称:ReactiveSeq
方法名:multicast
[英]Broadcast the contents of this Stream to multiple downstream Streams (determined by supplier parameter). For pull based Streams this Stream will be buffered. For push based Streams elements are broadcast downstream on receipt, the emitted downstream Streams remain asynchonous This contrasts with ReactiveSeq#duplicateReactiveSeq#triplicateReactiveSeq#quadruplicate()Which buffer all Stream types and produce a synchronous downstream stream.
[中]将此流的内容广播到多个下游流(由供应商参数确定)。对于基于拉的流,该流将被缓冲。对于接收时在下游广播的基于推送的流元素,发出的下游流保持异步,这与缓冲所有流类型并产生同步下游流的ReactiveSeq#duplicateReactiveSeq#triplicateReactiveSeq#Triplicate Seq#Triplicate()形成对比。
代码示例来源:origin: aol/cyclops
default <R1,R2,R3> ReactiveSeq<R3> fanOutZipIn(Function<? super ReactiveSeq<T>, ? extends ReactiveSeq<? extends R1>> path1,
Function<? super ReactiveSeq<T>, ? extends ReactiveSeq<? extends R2>> path2,
BiFunction<? super R1, ? super R2, ? extends R3> zipFn){
Seq<ReactiveSeq<T>> list = multicast(2);
return path1.apply(list.getOrElse(0,empty())).zip(path2.apply(list.getOrElse(1,empty())),zipFn);
}
default <R1,R2,R3> ReactiveSeq<R3> parallelFanOutZipIn(ForkJoinPool fj, Function<? super Stream<T>, ? extends Stream<? extends R1>> path1,
代码示例来源:origin: aol/cyclops
default <R> ReactiveSeq<R> fanOut(Function<? super ReactiveSeq<T>, ? extends ReactiveSeq<? extends R>> path1,
Function<? super ReactiveSeq<T>, ? extends ReactiveSeq<? extends R>> path2){
Seq<ReactiveSeq<T>> list = multicast(2);
Publisher<R> pub = (Publisher<R>)path2.apply(list.getOrElse(1,empty()));
ReactiveSeq<R> seq = (ReactiveSeq<R>)path1.apply(list.getOrElse(0,empty()));
return seq.mergeP(pub);
}
代码示例来源:origin: aol/cyclops
default <R> ReactiveSeq<R> fanOut(Function<? super ReactiveSeq<T>, ? extends ReactiveSeq<? extends R>> path1,
Function<? super ReactiveSeq<T>, ? extends ReactiveSeq<? extends R>> path2,
Function<? super ReactiveSeq<T>, ? extends ReactiveSeq<? extends R>> path3){
Seq<ReactiveSeq<T>> list = multicast(3);
Publisher<R> pub2 = (Publisher<R>)path2.apply(list.getOrElse(1,empty()));
Publisher<R> pub3 = (Publisher<R>)path3.apply(list.getOrElse(2,empty()));
ReactiveSeq<R> seq = (ReactiveSeq<R>)path1.apply(list.getOrElse(0,empty()));
return seq.mergeP(pub2,pub3);
}
default <R> ReactiveSeq<R> parallelFanOut(ForkJoinPool fj,Function<? super Stream<T>, ? extends Stream<? extends R>> path1,
代码示例来源:origin: aol/cyclops
default <R1,R2,R3,R4> ReactiveSeq<R4> fanOutZipIn(Function<? super ReactiveSeq<T>, ? extends ReactiveSeq<? extends R1>> path1,
Function<? super ReactiveSeq<T>, ? extends ReactiveSeq<? extends R2>> path2,
Function<? super ReactiveSeq<T>, ? extends ReactiveSeq<? extends R3>> path3,
Function3<? super R1, ? super R2, ? super R3, ? extends R4> zipFn){
Seq<ReactiveSeq<T>> list = multicast(3);
return path1.apply(list.getOrElse(0,empty()))
.zip3(path2.apply(list.getOrElse(1,empty())),
path3.apply(list.getOrElse(2,empty())),zipFn);
}
default <R> ReactiveSeq<R> fanOut(Function<? super ReactiveSeq<T>, ? extends ReactiveSeq<? extends R>> path1,
代码示例来源:origin: aol/cyclops
default <R> ReactiveSeq<R> fanOut(Function<? super ReactiveSeq<T>, ? extends ReactiveSeq<? extends R>> path1,
Function<? super ReactiveSeq<T>, ? extends ReactiveSeq<? extends R>> path2,
Function<? super ReactiveSeq<T>, ? extends ReactiveSeq<? extends R>> path3,
Function<? super ReactiveSeq<T>, ? extends ReactiveSeq<? extends R>> path4){
Seq<ReactiveSeq<T>> list = multicast(4);
Publisher<R> pub2 = (Publisher<R>)path2.apply(list.getOrElse(1,empty()));
Publisher<R> pub3 = (Publisher<R>)path3.apply(list.getOrElse(2,empty()));
Publisher<R> pub4 = (Publisher<R>)path4.apply(list.getOrElse(3,empty()));
ReactiveSeq<R> seq = (ReactiveSeq<R>)path1.apply(list.getOrElse(0,empty()));
return seq.mergeP(pub2,pub3,pub4);
}
default <R> ReactiveSeq<R> parallelFanOut(ForkJoinPool fj,Function<? super Stream<T>, ? extends Stream<? extends R>> path1,
代码示例来源:origin: aol/cyclops
default <R1,R2,R3,R4,R5> ReactiveSeq<R5> fanOutZipIn(Function<? super ReactiveSeq<T>, ? extends ReactiveSeq<? extends R1>> path1,
Function<? super ReactiveSeq<T>, ? extends ReactiveSeq<? extends R2>> path2,
Function<? super ReactiveSeq<T>, ? extends ReactiveSeq<? extends R3>> path3,
Function<? super ReactiveSeq<T>, ? extends ReactiveSeq<? extends R4>> path4,
Function4<? super R1, ? super R2, ? super R3, ? super R4, ? extends R5> zipFn){
Seq<ReactiveSeq<T>> list = multicast(4);
return path1.apply(list.getOrElse(0,empty()))
.zip4(path2.apply(list.getOrElse(1,empty())),
path3.apply(list.getOrElse(2,empty())),
path4.apply(list.getOrElse(3,empty())),
zipFn);
}
default <R1,R2,R3,R4,R5> ReactiveSeq<R5> parallelFanOutZipIn(ForkJoinPool fj,Function<? super Stream<T>, ? extends Stream<? extends R1>> path1,
代码示例来源:origin: com.oath.cyclops/cyclops-futurestream
@Override
default Seq<ReactiveSeq<U>> multicast(int num) {
return stream().multicast(num);
}
代码示例来源:origin: com.oath.cyclops/cyclops
default <R1,R2,R3> ReactiveSeq<R3> fanOutZipIn(Function<? super ReactiveSeq<T>, ? extends ReactiveSeq<? extends R1>> path1,
Function<? super ReactiveSeq<T>, ? extends ReactiveSeq<? extends R2>> path2,
BiFunction<? super R1, ? super R2, ? extends R3> zipFn){
Seq<ReactiveSeq<T>> list = multicast(2);
return path1.apply(list.getOrElse(0,empty())).zip(path2.apply(list.getOrElse(1,empty())),zipFn);
}
default <R1,R2,R3> ReactiveSeq<R3> parallelFanOutZipIn(ForkJoinPool fj, Function<? super Stream<T>, ? extends Stream<? extends R1>> path1,
代码示例来源:origin: com.oath.cyclops/cyclops
default <R> ReactiveSeq<R> fanOut(Function<? super ReactiveSeq<T>, ? extends ReactiveSeq<? extends R>> path1,
Function<? super ReactiveSeq<T>, ? extends ReactiveSeq<? extends R>> path2){
Seq<ReactiveSeq<T>> list = multicast(2);
Publisher<R> pub = (Publisher<R>)path2.apply(list.getOrElse(1,empty()));
ReactiveSeq<R> seq = (ReactiveSeq<R>)path1.apply(list.getOrElse(0,empty()));
return seq.mergeP(pub);
}
代码示例来源:origin: com.oath.cyclops/cyclops
default <R1,R2,R3,R4> ReactiveSeq<R4> fanOutZipIn(Function<? super ReactiveSeq<T>, ? extends ReactiveSeq<? extends R1>> path1,
Function<? super ReactiveSeq<T>, ? extends ReactiveSeq<? extends R2>> path2,
Function<? super ReactiveSeq<T>, ? extends ReactiveSeq<? extends R3>> path3,
Function3<? super R1, ? super R2, ? super R3, ? extends R4> zipFn){
Seq<ReactiveSeq<T>> list = multicast(3);
return path1.apply(list.getOrElse(0,empty()))
.zip3(path2.apply(list.getOrElse(1,empty())),
path3.apply(list.getOrElse(2,empty())),zipFn);
}
default <R> ReactiveSeq<R> fanOut(Function<? super ReactiveSeq<T>, ? extends ReactiveSeq<? extends R>> path1,
代码示例来源:origin: com.oath.cyclops/cyclops
default <R> ReactiveSeq<R> fanOut(Function<? super ReactiveSeq<T>, ? extends ReactiveSeq<? extends R>> path1,
Function<? super ReactiveSeq<T>, ? extends ReactiveSeq<? extends R>> path2,
Function<? super ReactiveSeq<T>, ? extends ReactiveSeq<? extends R>> path3){
Seq<ReactiveSeq<T>> list = multicast(3);
Publisher<R> pub2 = (Publisher<R>)path2.apply(list.getOrElse(1,empty()));
Publisher<R> pub3 = (Publisher<R>)path3.apply(list.getOrElse(2,empty()));
ReactiveSeq<R> seq = (ReactiveSeq<R>)path1.apply(list.getOrElse(0,empty()));
return seq.mergeP(pub2,pub3);
}
default <R> ReactiveSeq<R> parallelFanOut(ForkJoinPool fj,Function<? super Stream<T>, ? extends Stream<? extends R>> path1,
代码示例来源:origin: com.oath.cyclops/cyclops
default <R> ReactiveSeq<R> fanOut(Function<? super ReactiveSeq<T>, ? extends ReactiveSeq<? extends R>> path1,
Function<? super ReactiveSeq<T>, ? extends ReactiveSeq<? extends R>> path2,
Function<? super ReactiveSeq<T>, ? extends ReactiveSeq<? extends R>> path3,
Function<? super ReactiveSeq<T>, ? extends ReactiveSeq<? extends R>> path4){
Seq<ReactiveSeq<T>> list = multicast(4);
Publisher<R> pub2 = (Publisher<R>)path2.apply(list.getOrElse(1,empty()));
Publisher<R> pub3 = (Publisher<R>)path3.apply(list.getOrElse(2,empty()));
Publisher<R> pub4 = (Publisher<R>)path4.apply(list.getOrElse(3,empty()));
ReactiveSeq<R> seq = (ReactiveSeq<R>)path1.apply(list.getOrElse(0,empty()));
return seq.mergeP(pub2,pub3,pub4);
}
default <R> ReactiveSeq<R> parallelFanOut(ForkJoinPool fj,Function<? super Stream<T>, ? extends Stream<? extends R>> path1,
代码示例来源:origin: com.oath.cyclops/cyclops
default <R1,R2,R3,R4,R5> ReactiveSeq<R5> fanOutZipIn(Function<? super ReactiveSeq<T>, ? extends ReactiveSeq<? extends R1>> path1,
Function<? super ReactiveSeq<T>, ? extends ReactiveSeq<? extends R2>> path2,
Function<? super ReactiveSeq<T>, ? extends ReactiveSeq<? extends R3>> path3,
Function<? super ReactiveSeq<T>, ? extends ReactiveSeq<? extends R4>> path4,
Function4<? super R1, ? super R2, ? super R3, ? super R4, ? extends R5> zipFn){
Seq<ReactiveSeq<T>> list = multicast(4);
return path1.apply(list.getOrElse(0,empty()))
.zip4(path2.apply(list.getOrElse(1,empty())),
path3.apply(list.getOrElse(2,empty())),
path4.apply(list.getOrElse(3,empty())),
zipFn);
}
default <R1,R2,R3,R4,R5> ReactiveSeq<R5> parallelFanOutZipIn(ForkJoinPool fj,Function<? super Stream<T>, ? extends Stream<? extends R1>> path1,
内容来源于网络,如有侵权,请联系作者删除!