cyclops.reactive.ReactiveSeq类的使用及代码示例

x33g5p2x  于2022-01-29 转载在 其他  
字(9.7k)|赞(0)|评价(0)|浏览(168)

本文整理了Java中cyclops.reactive.ReactiveSeq类的一些代码示例,展示了ReactiveSeq类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。ReactiveSeq类的具体详情如下:
包路径:cyclops.reactive.ReactiveSeq
类名称:ReactiveSeq

ReactiveSeq介绍

[英]A powerful Streaming interface. Use factory methods on this class for performant, synchronous Streams Use factory methods on Spouts for asynchronous streaming with and without back pressure Use factory methods on FutureStream (cyclops-futurestream) for powerful parallel streaming Features include Asynchronous execution Scheduling Error handling Retries Zipping Duplication Cartesian zip operations (e.g. crossJoin, forEach2) Subscriptions and fined grained control Interoperability Parallelism via FutureStream Lazy grouping (group by size, time, state) Sliding windows n\ Efficient reversal foldRight / scanLeft / scanRight Zipping and Combining Data insertion and removal Time based operations (debouncing, onePer, xPer) SQL style Window operations Reduction and partial reduction Mathematical terminal operations Lazy execution Empty handling Cycling / repeating Controlled iteration (forEach) Event handling (on next, on error, on complete) [中]强大的流媒体接口。在这个类上使用工厂方法来实现性能,同步流在喷口上使用工厂方法进行异步流传输(有或没有背压)在FutureStream(cyclops FutureStream)上使用工厂方法进行强大的并行流传输功能包括异步执行调度错误处理重试压缩重复笛卡尔压缩操作(例如交叉连接、forEach2)订阅和细粒度控制互操作性并行性通过FutureStream惰性分组(按大小、时间、状态分组)滑动窗口n\n高效的反向折叠/scanLeft/scanRight压缩并结合基于时间的数据插入和删除操作(去Bouncing、onePer、xPer)SQL风格的窗口操作减少和部分减少数学终端操作延迟执行空处理循环/重复受控迭代(forEach)事件处理(下一次、出错、完成时)

代码示例

代码示例来源:origin: aol/micro-server

default String getJaxWsRsApplication() {
  List<String> jaxRsApplications = ReactiveSeq.fromStream(PluginLoader.INSTANCE.plugins.get()
                                             .stream())
                        .filter(module -> module.jaxWsRsApplication() != null)
                        .map(Plugin::jaxWsRsApplication)
                        .flatMap(Streams::optionalToStream)
                        .toList();
  if (jaxRsApplications.size() > 1) {
    throw new IncorrectJaxRsPluginsException(
                         "ERROR!  Multiple jax-rs application plugins found,  a possible solution is to remove micro-jackson or other jax-rs application provider from your classpath. "
                             + jaxRsApplications);
  } else if (jaxRsApplications.size() == 0) {
    throw new IncorrectJaxRsPluginsException(
                         "ERROR!  No jax-rs application plugins found, a possible solution is to add micro-jackson to your classpath. ");
  }
  return jaxRsApplications.get(0);
}

代码示例来源:origin: aol/micro-server

@GET
@Produces("application/json")
public void mainfest(@Suspended AsyncResponse asyncResponse, @Context ServletContext context) {
  
  ReactiveSeq.of("/META-INF/MANIFEST.MF")
        .map(url->context.getResourceAsStream(url))
        .map(this::getManifest)
      .foldFuture(WorkerThreads.ioExecutor.get(),
        s->s.forEach(Long.MAX_VALUE,result->asyncResponse.resume(result)));
        
  
}

代码示例来源:origin: aol/micro-server

public static Optional<RegisterEntry> toRegisterEntry(UriInfo uriInfo) {
  if (uriInfo.getQueryParameters().isEmpty()) {
    return Optional.empty();
  } else {
    MultivaluedMap<String, String> parameters = uriInfo.getQueryParameters();
    RegisterEntry re = RegisterEntry.builder()
      .context(parameters.getFirst("context"))
      .hostname(parameters.getFirst("hostname"))
      .port(toInt(parameters.getFirst("port")))
      .target(parameters.getFirst("target"))
      .externalPort(toInt(parameters.getFirst("externalPort")))
      .module(parameters.getFirst("module"))
      .health(toHealth(parameters.getFirst("health")))
      .build();
    Map<String, String> manifest = ReactiveSeq.fromIterable(parameters.entrySet())
      .filter(e -> e.getKey().startsWith("manifest."))
      .toMap(e -> e.getKey().replace("manifest.", ""),
        e -> parameters.getFirst(e.getKey()));
    re.getManifest().clear();
    re.getManifest().putAll(manifest);
    return Optional.of(re);
  }
}

代码示例来源:origin: aol/micro-server

private ListX<Plugin> load(){
     return  ReactiveSeq.fromIterable(ServiceLoader.load(Plugin.class)).to(ListX::fromIterable);
  }
}

代码示例来源:origin: aol/micro-server

public MicroserverPlugins(Class... classes){
  ReactiveSeq<Class> rs=  classes!=null ? ReactiveSeq.of(classes) : ReactiveSeq.empty();
  this.classes = rs
    .appendStream(ReactiveSeq.of(new MicroserverApp(true,extractClass(),()->"").classes))
    .toArray(i->new Class[i]);
}
public MicroserverPlugins(Module mod, Class... classes){

代码示例来源:origin: aol/micro-server

default String getProviders() {
  String additional = ReactiveSeq.fromStream(PluginLoader.INSTANCE.plugins.get()
                                      .stream())
                  .peek(System.out::println)
                  .filter(module -> module.providers() != null)
                  .concatMap(Plugin::providers)
                  .join(",");
  if (StringUtils.isEmpty(additional))
    return "com.oath.micro.server.rest.providers";
  return "com.oath.micro.server.rest.providers," + additional;
}

代码示例来源:origin: aol/cyclops

default <R1, R> ImmutableQueue<R> forEach2(Function<? super T, ? extends Iterable<R1>> iterable1,
                      BiFunction<? super T, ? super R1, Boolean> filterFunction,
                      BiFunction<? super T, ? super R1, ? extends R> yieldingFunction) {
  return this.concatMap(in-> {
    Iterable<? extends R1> b = iterable1.apply(in);
    return ReactiveSeq.fromIterable(b)
        .filter(in2-> filterFunction.apply(in,in2))
        .map(in2->yieldingFunction.apply(in, in2));
  });
}

代码示例来源:origin: aol/cyclops

@Test
public void batchBySize() {
  iterate("", last -> "next")
      .limit(100)
      .grouped(10)
      .onePer(1, TimeUnit.MICROSECONDS)
      .peek(batch -> System.out.println("batched : " + batch))
      .concatMap(Function.identity())
      .peek(individual -> System.out.println("Flattened : "
          + individual))
      .forEach(a->{});
}

代码示例来源:origin: aol/cyclops

default <R1, R2, R> ImmutableQueue<R> forEach3(Function<? super T, ? extends Iterable<R1>> iterable1,
                        BiFunction<? super T, ? super R1, ? extends Iterable<R2>> iterable2,
                        Function3<? super T, ? super R1, ? super R2, ? extends R> yieldingFunction) {
  return this.concatMap(in -> {
    Iterable<R1> a = iterable1.apply(in);
    return ReactiveSeq.fromIterable(a)
        .flatMap(ina -> {
          ReactiveSeq<R2> b = ReactiveSeq.fromIterable(iterable2.apply(in, ina));
          return b.map(in2 -> yieldingFunction.apply(in, ina, in2));
        });
  });
}

代码示例来源:origin: aol/micro-server

private ReactiveSeq<SystemData<String, String>> create(DataLoader dl) {
    return ReactiveSeq.generate(() -> 1)
      .filter(in -> condition.shouldLoad())
      .map(in -> dl.scheduleAndLog())
      .peek(sd -> bus.post(sd));
  }
}

代码示例来源:origin: aol/micro-server

default Map<String, Filter> getFilters(ServerData data) {
  Map<String, Filter> map = new HashMap<>();
  ReactiveSeq.fromStream(PluginLoader.INSTANCE.plugins.get()
                            .stream())
        .filter(module -> module.filters() != null)
        .map(module -> module.filters()
                  .apply(data))
        .forEach(pluginMap -> map.putAll(pluginMap));
  return MapX.fromMap(map);
}

代码示例来源:origin: aol/cyclops

default  ReactiveSeq<E> stream(E e){
  return ReactiveSeq.range(fromEnum(e),Integer.MAX_VALUE).map(this::toEnum)
      .takeWhile(Option::isPresent)
        .filter(Option::isPresent).flatMap(Option::stream);
}

代码示例来源:origin: aol/micro-server

default List<ServletRequestListener> getRequestListeners(ServerData data) {
  return PluginLoader.INSTANCE.plugins.get()
                    .stream()
                    .filter(module -> module.servletRequestListeners() != null)
                    .concatMap(Plugin::servletRequestListeners)
                    .map(fn -> fn.apply(data))
                     .to(ListX::fromIterable);
}

代码示例来源:origin: aol/cyclops

@Test
public void flatMapStreamFilter() {
  assertThat(of(1, 2, 3).flatMap(i -> ReactiveSeq.of(i).filter(Objects::nonNull))
          .collect(Collectors.toList()),
      Matchers.equalTo(Arrays.asList(1, 2, 3)));
}
@Test

代码示例来源:origin: aol/cyclops

@Test
public void concatMapStream() {
  assertThat(of(1, 2, 3).concatMap(i -> ReactiveSeq.of(i).filter(Objects::nonNull))
          .collect(Collectors.toList()),
      Matchers.equalTo(Arrays.asList(1, 2, 3)));
}
@Test

代码示例来源:origin: aol/micro-server

public SpringContextFactory(Config config, Class<?> c, Set<Class<?>> classes) {
  PersistentSet<Class> s = config.getClasses()
          .plusAll(classes);
  s= s.plus(c);
  Microserver microserver = c.getAnnotation(Microserver.class);
  final PersistentSet<Class> immutableS = s;
  s = Optional.ofNullable(microserver)
        .flatMap(ms -> Optional.ofNullable(ms.blacklistedClasses()))
        .map(bl -> {
          Set<Class> blacklistedClasses = Arrays.stream(bl)
                             .collect(Collectors.toSet());
          return (PersistentSet<Class>)immutableS.stream()
                   .filter(clazz -> !blacklistedClasses.contains(clazz)).hashSet();
        })
        .orElse(immutableS);
  this.classes = s;
  this.config = config;
  springBuilder = ReactiveSeq.fromStream(PluginLoader.INSTANCE.plugins.get()
                                    .stream())
                .filter(m -> m.springBuilder() != null)
                .map(Plugin::springBuilder)
                .findFirst()
                .orElse(new SpringApplicationConfigurator());
}

代码示例来源:origin: aol/cyclops

@Test
public void forEachXWithErrorExample(){
  Subscription s = ReactiveSeq.of(10,20,30)
                .map(this::process)
                .forEach( 2,System.out::println, System.err::println);
  System.out.println("Completed 2");
  s.request(1);
  System.out.println("Finished all!");
}
@Test

代码示例来源:origin: aol/cyclops

@Test
public void coflatMapTest(){
  ReactiveSeq<ReactiveSeq<Integer>> stream = ReactiveSeq.fromIterable(Arrays.asList(1,2,3))
                              .coflatMap(s -> s);
  ReactiveSeq<Integer> stream2 = stream.flatMap(s -> s).map(i -> i * 10);
  ReactiveSeq<Integer> stream3 = stream.flatMap(s -> s).map(i -> i * 100);
  assertThat(stream2.toList(),equalTo(Arrays.asList(10,20,30)));
  assertThat(stream3.toList(),equalTo(Arrays.asList(100,200,300)));
}
@Test

代码示例来源:origin: aol/micro-server

public ReactiveSeq<Tuple2<String,String>> extractResources() {
  return resources.stream().peek(resource -> logMissingPath(resource))
              .filter(resource-> resource.getClass().getAnnotation(Path.class)!=null)
              .map(resource -> Tuple.tuple(resource.getClass().getName(),
                  resource.getClass().getAnnotation(Path.class).value()));
}

代码示例来源:origin: aol/micro-server

@GET
@Produces("application/json")
@Path("/all-requests")
public void allActiveRequests(@Suspended AsyncResponse asyncResponse) {
  ReactiveSeq.of(activeQueries.toString())
        .foldFuture(WorkerThreads.ioExecutor.get(),
        s->s.forEach(Long.MAX_VALUE,result -> asyncResponse.resume(result)));
}

相关文章

微信公众号

最新文章

更多

ReactiveSeq类方法