本文整理了Java中cyclops.reactive.ReactiveSeq.foldFuture
方法的一些代码示例,展示了ReactiveSeq.foldFuture
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。ReactiveSeq.foldFuture
方法的具体详情如下:
包路径:cyclops.reactive.ReactiveSeq
类名称:ReactiveSeq
方法名:foldFuture
暂无
代码示例来源:origin: aol/micro-server
@GET
@Produces("application/json")
@Path("/jobs")
public void activeJobs(@Suspended AsyncResponse asyncResponse) {
try {
ReactiveSeq.of(this.activeJobs)
.map(JobsBeingExecuted::toString)
.foldFuture(WorkerThreads.ioExecutor.get(),
s -> s.forEach(Long.MAX_VALUE, str -> asyncResponse.resume(str)));
} catch (Exception e) {
e.printStackTrace();
}
}
代码示例来源:origin: aol/micro-server
@GET
@Produces("application/json")
@Path("/all-requests")
public void allActiveRequests(@Suspended AsyncResponse asyncResponse) {
ReactiveSeq.of(activeQueries.toString())
.foldFuture(WorkerThreads.ioExecutor.get(),
s->s.forEach(Long.MAX_VALUE,result -> asyncResponse.resume(result)));
}
代码示例来源:origin: aol/micro-server
@GET
@Produces("application/json")
public void mainfest(@Suspended AsyncResponse asyncResponse, @Context ServletContext context) {
ReactiveSeq.of("/META-INF/MANIFEST.MF")
.map(url->context.getResourceAsStream(url))
.map(this::getManifest)
.foldFuture(WorkerThreads.ioExecutor.get(),
s->s.forEach(Long.MAX_VALUE,result->asyncResponse.resume(result)));
}
代码示例来源:origin: aol/micro-server
@GET
@Produces("application/json")
@Path("/requests")
public void activeRequests(@Suspended AsyncResponse asyncResponse, @QueryParam("type") final String type) {
ReactiveSeq.of((type == null ? "default" : type))
.map(typeToUse -> activeQueries.getMap()
.get(typeToUse)
.toString())
.foldFuture(WorkerThreads.ioExecutor.get(),
s->s.forEach(Long.MAX_VALUE,result -> asyncResponse.resume(result)));
}
代码示例来源:origin: aol/micro-server
@GET
@Path("/list")
@Produces("application/json")
public void list(@Context UriInfo uriInfo, @Suspended AsyncResponse response) {
ReactiveSeq.of(this).foldFuture(WorkerThreads.ioExecutor.get(),
s -> s.forEach(Long.MAX_VALUE, next -> {
try {
cleaner.clean();
response.resume(finder.find(UriInfoParser.toRegisterEntry(uriInfo)));
} catch (Exception e) {
logger.error(e.getMessage(), e);
response.resume(Arrays.asList("Bad Request: " + e.getMessage()));
}
}));
}
代码示例来源:origin: aol/micro-server
@POST
@Path("/schedule")
@Consumes("application/json")
@Produces("application/json")
public void schedule(@Suspended AsyncResponse response) {
ReactiveSeq.of(this).foldFuture(WorkerThreads.ioExecutor.get(), s ->
s.forEach(Long.MAX_VALUE, next -> {
try {
job.schedule();
response.resume(HashMapBuilder.of("status", "success"));
} catch (Exception e) {
logger.error(e.getMessage(), e);
response.resume(HashMapBuilder.of("status", "failure"));
}
}));
}
代码示例来源:origin: aol/micro-server
@POST
@Path("/register")
@Consumes("application/json")
@Produces("application/json")
public void register(@Suspended AsyncResponse response, RegisterEntry entry) {
ReactiveSeq.of(this).foldFuture(WorkerThreads.ioExecutor.get(),
s -> s.forEach(Long.MAX_VALUE, next -> {
try {
register.register(entry);
response.resume(HashMapBuilder.of("status", "complete"));
} catch (Exception e) {
logger.error(e.getMessage(), e);
response.resume(HashMapBuilder.of("status", "failure"));
}
}));
}
代码示例来源:origin: aol/micro-server
public void schedule() {
loader.forEach(dl -> {
// run on startup
create(dl).limit(1).foldFuture(executor, s -> s.forEach(Long.MAX_VALUE, l -> {}));
// schedule
create(dl).schedule(dl.getCron(), executor);
});
}
代码示例来源:origin: aol/cyclops
/**
* Create a push based Stream with <b>no backpressure</b> fromm the provided Stream.
* The provided Stream will be executed on the provided executor and pushed to the returned Stream
*
* @param seq Stream to execute and push to a new non-backpressure aware Stream
* @param exec
* @param <T>
* @return
*/
static <T> ReactiveSeq<T> async(Stream<T> seq, Executor exec){
return async(s->{
ReactiveSeq.fromStream(seq).foldFuture(exec,t->{
PushSubscriber<T> local = s;
t.forEach(local::onNext,local::onError,local::onComplete);
return null;
});
});
}
代码示例来源:origin: aol/cyclops
AtomicBoolean complete = new AtomicBoolean();
AtomicLong requested = new AtomicLong(0);
ReactiveSeq.fromStream(seq).foldFuture(exec,t->{
Subscriber<T> local = subscriber.getFuture().join();
Subscription streamSub = t.forEach(0,local::onNext,local::onError,()->{
代码示例来源:origin: aol/cyclops
@Test
public void forEachX(){
Subscription s = ReactiveSeq.of(1,2,3)
.foldFuture(exec,t->t.forEach(2,System.out::println))
.orElse(null);
System.out.println("takeOne batch");
s.request(1);
}
@Test
代码示例来源:origin: aol/cyclops
@Test
public void forEachXTest(){
List<Integer> list = new ArrayList<>();
Subscription s = ReactiveSeq.of(1,2,3)
.foldFuture(exec,t->t.forEach(2, i->list.add(i))).toOptional().get();
while(list.size()!=2){
}
assertThat(list,hasItems(1,2));
assertThat(list.size(),equalTo(2));
s.request(1);
assertThat(list,hasItems(1,2,3));
assertThat(list.size(),equalTo(3));
}
代码示例来源:origin: aol/cyclops
@Test @Ignore
public void push(){
ReactiveSubscriber<String> pushable = Spouts.reactiveSubscriber();
ReactiveSeq<String> stream = pushable.reactiveStream();
Executor ex= Executors.newFixedThreadPool(1);
Future<List<String>> list = stream.peek(System.err::println)
.foldFuture(ex,s->s.collect(Collectors.toList()));
pushable.onNext("hello");
pushable.onComplete();
assertThat(list.orElse(new ArrayList<>()).size(),equalTo(1));
}
代码示例来源:origin: aol/cyclops
@Test
public void testMapReduce(){
assertThat(of(1,2,3,4,5).map(it -> it*100).foldFuture(exec,s->s
.foldLeft( (acc,next) -> acc+next))
.orElse(null),is(Option.of(1500)));
}
@Test
代码示例来源:origin: aol/cyclops
@Test
public void testMapReduceSeed(){
assertThat(of(1,2,3,4,5).map(it -> it*100)
.foldFuture(exec,s->s.foldLeft( 50,(acc,next) -> acc+next)).get()
,is(Future.ofResult(1550).get()));
}
代码示例来源:origin: com.oath.microservices/micro-events
@GET
@Produces("application/json")
@Path("/all-requests")
public void allActiveRequests(@Suspended AsyncResponse asyncResponse) {
ReactiveSeq.of(activeQueries.toString())
.foldFuture(WorkerThreads.ioExecutor.get(),
s->s.forEach(Long.MAX_VALUE,result -> asyncResponse.resume(result)));
}
代码示例来源:origin: com.oath.microservices/micro-events
@GET
@Produces("application/json")
@Path("/jobs")
public void activeJobs(@Suspended AsyncResponse asyncResponse) {
try {
ReactiveSeq.of(this.activeJobs)
.map(JobsBeingExecuted::toString)
.foldFuture(WorkerThreads.ioExecutor.get(),
s -> s.forEach(Long.MAX_VALUE, str -> asyncResponse.resume(str)));
} catch (Exception e) {
e.printStackTrace();
}
}
代码示例来源:origin: com.oath.microservices/micro-events
@GET
@Produces("application/json")
public void mainfest(@Suspended AsyncResponse asyncResponse, @Context ServletContext context) {
ReactiveSeq.of("/META-INF/MANIFEST.MF")
.map(url->context.getResourceAsStream(url))
.map(this::getManifest)
.foldFuture(WorkerThreads.ioExecutor.get(),
s->s.forEach(Long.MAX_VALUE,result->asyncResponse.resume(result)));
}
代码示例来源:origin: com.oath.microservices/micro-events
@GET
@Produces("application/json")
@Path("/requests")
public void activeRequests(@Suspended AsyncResponse asyncResponse, @QueryParam("type") final String type) {
ReactiveSeq.of((type == null ? "default" : type))
.map(typeToUse -> activeQueries.getMap()
.get(typeToUse)
.toString())
.foldFuture(WorkerThreads.ioExecutor.get(),
s->s.forEach(Long.MAX_VALUE,result -> asyncResponse.resume(result)));
}
代码示例来源:origin: com.oath.microservices/micro-application-register
@GET
@Path("/list")
@Produces("application/json")
public void list(@Context UriInfo uriInfo, @Suspended AsyncResponse response) {
ReactiveSeq.of(this).foldFuture(WorkerThreads.ioExecutor.get(),
s -> s.forEach(Long.MAX_VALUE, next -> {
try {
cleaner.clean();
response.resume(finder.find(UriInfoParser.toRegisterEntry(uriInfo)));
} catch (Exception e) {
logger.error(e.getMessage(), e);
response.resume(Arrays.asList("Bad Request: " + e.getMessage()));
}
}));
}
内容来源于网络,如有侵权,请联系作者删除!