cyclops.reactive.ReactiveSeq.forEach()方法的使用及代码示例

x33g5p2x  于2022-01-29 转载在 其他  
字(9.4k)|赞(0)|评价(0)|浏览(157)

本文整理了Java中cyclops.reactive.ReactiveSeq.forEach方法的一些代码示例,展示了ReactiveSeq.forEach的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。ReactiveSeq.forEach方法的具体详情如下:
包路径:cyclops.reactive.ReactiveSeq
类名称:ReactiveSeq
方法名:forEach

ReactiveSeq.forEach介绍

[英]Performs an action for each element of this Stream. For potentially non-blocking analogs see ReactiveSeq#forEachAsync(Consumer) and forEach overloads such as ReactiveSeq#forEach(Consumer,Consumer) and ReactiveSeq#forEach(Consumer,Consumer,Runnable)This method overrides the JDK java.util.stream.Stream#forEach(Consumer) and maintains it's blocking semantics. Other forEach overloads in ReactiveSeq are non-blocking for asynchronously executing Streams.

This is a terminal operation.

The behavior of this operation is explicitly nondeterministic. For parallel stream pipelines, this operation does not guarantee to respect the encounter order of the stream, as doing so would sacrifice the benefit of parallelism. For any given element, the action may be performed at whatever time and in whatever thread the library chooses. If the action accesses shared state, it is responsible for providing the required synchronization.
[中]对该流的每个元素执行一个操作。有关潜在的非阻塞模拟,请参见ReactiveSeq#forEachAsync(Consumer)和forEach重载,如ReactiveSeq#forEach(Consumer,Consumer)和ReactiveSeq#forEach(Consumer,Consumer,Runnable)。此方法覆盖JDK java。util。流动流#forEach(Consumer)并维护其阻塞语义。ReactiveSeq中的其他forEach重载对于异步执行的流是非阻塞的。
这是一个terminal operation
此操作的行为显然是不确定的。对于并行流管道,此操作保证尊重流的相遇顺序,因为这样做会牺牲并行性的好处。对于任何给定的元素,可以在库选择的任何时间和线程中执行该操作。如果操作访问共享状态,它将负责提供所需的同步。

代码示例

代码示例来源:origin: aol/micro-server

default Map<String, Filter> getFilters(ServerData data) {
  Map<String, Filter> map = new HashMap<>();
  ReactiveSeq.fromStream(PluginLoader.INSTANCE.plugins.get()
                            .stream())
        .filter(module -> module.filters() != null)
        .map(module -> module.filters()
                  .apply(data))
        .forEach(pluginMap -> map.putAll(pluginMap));
  return MapX.fromMap(map);
}

代码示例来源:origin: aol/micro-server

default Map<String, Servlet> getServlets(ServerData data) {
  Map<String, Servlet> map = new HashMap<>();
  ReactiveSeq.fromStream(PluginLoader.INSTANCE.plugins.get()
                            .stream())
        .filter(module -> module.servlets() != null)
        .map(module -> module.servlets()
                  .apply(data))
        .forEach(pluginMap -> map.putAll(pluginMap));
  return MapX.fromMap(map);
}

代码示例来源:origin: aol/micro-server

public List<Thread> run() {
  register.ifPresent( reg -> 
    reg.register(
      apps.stream().map(app -> app.getServerData())
        .collect(Collectors.toList())
        .toArray(new ServerData[0])));
  Map<ServerApplication,CompletableFuture> mapFutures = new HashMap<>();
  apps.stream().forEach(app -> mapFutures.put(app,new CompletableFuture()));
  
  List<Thread> threads = apps.stream().map(app -> start(app, app.getServerData().getModule(),mapFutures.get(app))).collect(Collectors.toList());
  mapFutures.values().forEach(future -> get(future));
  
  logger.info("Started {} Rest applications ", apps.size());
  return threads;
}

代码示例来源:origin: aol/micro-server

@Bean
public Properties propertyFactory() throws IOException {
  List<Resource> resources = loadPropertyResource();
  PropertiesFactoryBean factory = new PropertiesFactoryBean();
  factory.setLocations(resources.toArray(new Resource[resources.size()]));
  factory.afterPropertiesSet();
  Properties props = factory.getObject();
  new ConfigAccessor().get()
            .getProperties()
            .stream()
            .forEach(e -> {
              if (props.getProperty(e._1()) == null) {
                props.put(e._1(), e._2());
              }
            });
  System.getProperties()
     .entrySet()
     .forEach(e -> props.put(e.getKey(), e.getValue()));
  return props;
}

代码示例来源:origin: aol/micro-server

private void startServer(WebappContext webappContext, HttpServer httpServer, CompletableFuture start, CompletableFuture end) {
  webappContext.deploy(httpServer);
  try {
    logger.info("Starting application {} on port {}", serverData.getModule().getContext(), serverData.getPort());
    logger.info("Browse to http://localhost:{}/{}/application.wadl", serverData.getPort(), serverData.getModule().getContext());
    logger.info("Configured resource classes :-");
    serverData.extractResources()
        .forEach(t -> logger.info(t._1() + " : " + "http://localhost:" + serverData.getPort() + "/" + serverData.getModule().getContext() + t._2()));
    ;
    httpServer.start();
    start.complete(true);
    end.get();
  } catch (IOException e) {
    throw ExceptionSoftener.throwSoftenedException(e);
  } catch (ExecutionException e) {
    throw ExceptionSoftener.throwSoftenedException(e);
  } catch (InterruptedException e) {
    Thread.currentThread().interrupt();
    throw ExceptionSoftener.throwSoftenedException(e);
  } finally {
    httpServer.stop();
  }
}

代码示例来源:origin: aol/micro-server

@Override
public ConfigurableApplicationContext createSpringApp(Config config, Class... classes) {
  logger.debug("Configuring Spring");
  AnnotationConfigWebApplicationContext rootContext = new AnnotationConfigWebApplicationContext();
  rootContext.setAllowCircularReferences(config.isAllowCircularReferences());
  rootContext.register(classes);
  rootContext.scan(config.getBasePackages());
  rootContext.refresh();
  logger.debug("Configuring Additional Spring Beans");
  ConfigurableListableBeanFactory beanFactory = ((ConfigurableApplicationContext) rootContext).getBeanFactory();
  config.getDataSources()
     .stream()
      .map(Tuple2::_1)
     .filter(it -> !new ConfigAccessor().get()
                       .getDefaultDataSourceName()
                       .equals(it))
     .forEach(name -> {
       List<SpringDBConfig> dbConfig = getConfig(config, rootContext, beanFactory);
       dbConfig.forEach(spring -> spring.createSpringApp(name));
     });
  logger.debug("Finished Configuring Spring");
  return rootContext;
}

代码示例来源:origin: aol/cyclops

public ValueEmittingSpliterator(long est, int additionalCharacteristics,ReactiveSeq<T> seq) {
  super(
     est, additionalCharacteristics & Spliterator.ORDERED);
  seq.forEach(e->value.set(e));
}

代码示例来源:origin: aol/cyclops

@Test
public void forEachXWithErrorExample(){
  Subscription s = ReactiveSeq.of(10,20,30)
                .map(this::process)
                .forEach( 2,System.out::println, System.err::println);
  System.out.println("Completed 2");
  s.request(1);
  System.out.println("Finished all!");
}
@Test

代码示例来源:origin: aol/cyclops

protected  ReactiveSeq<Long> rangeLong(long start,long end){
  return Spouts.async(s->{
    Thread t = new Thread(()-> {
      rangeLong(start,end).forEach(s::onNext);
      s.onComplete();
    });
    t.start();
  });
}
@Test

代码示例来源:origin: aol/cyclops

@Test
public void forEachXTest() throws InterruptedException {
  List<Integer> list = new ArrayList<>();
  Subscription s = of(1,2,3).forEach( 2, i->list.add(i));
  Thread.sleep(100);
  assertThat(list,hasItems(1,2,3));
  assertThat(list.size(),equalTo(3));
}
@Test

代码示例来源:origin: aol/cyclops

@Test
public void forEach() throws InterruptedException {
  List<Integer> list = new ArrayList<>();
  of(1,5,3,4,2).forEach(it-> list.add(it));
  Thread.sleep(500l);
  assertThat(list,hasItem(1));
  assertThat(list,hasItem(2));
  assertThat(list,hasItem(3));
  assertThat(list,hasItem(4));
  assertThat(list,hasItem(5));
}
@Test

代码示例来源:origin: aol/cyclops

@Test
public void forEachXTestIsComplete(){
  List<Integer> list = new ArrayList<>();
  Subscription s = ReactiveSeq.of(1,2,3).forEach( 2, i->list.add(i));
  assertThat(list,hasItems(1,2));
  assertThat(list.size(),equalTo(2));
  s.request(1);
  assertThat(list,hasItems(1,2,3));
  assertThat(list.size(),equalTo(3));
}
Throwable error;

代码示例来源:origin: aol/cyclops

@Test
public void fixedDelay2() {
  fromIntStream(IntStream.range(0, 1000))
      .fixedDelay(1l, TimeUnit.MICROSECONDS).peek(System.out::println)
      .forEach(a->{});
}
@Test

代码示例来源:origin: aol/cyclops

@Test
public void forEachWithErrors2(){
  error = null;
  List<Integer> result = new ArrayList<>();
  ReactiveSeq.of(1,2,3,4,5,6)
      .map(this::errors)
      .forEach(e->result.add(e), e->error=e);
  assertNotNull(error);
  System.out.println(result);
  assertThat(result,hasItems(1,3,4,5,6));
  assertThat(result,not(hasItems(2)));
}

代码示例来源:origin: aol/cyclops

@Test
public void batchByTimex() {
      iterate("", last -> "next")
      .limit(100)
      .peek(next->System.out.println("Counter " +count2.incrementAndGet()))
      .groupedByTime(10, TimeUnit.MICROSECONDS)
      .peek(batch -> System.out.println("batched : " + batch))
      .filter(c->!c.isEmpty())
      .forEach(System.out::println);
}

代码示例来源:origin: aol/cyclops

@Test
public void batchByTimex() {
      iterate("", last -> "next")
      .limit(100)
      .peek(next->System.out.println("Counter " +count2.incrementAndGet()))
      .groupedByTime(10, TimeUnit.MICROSECONDS)
      .peek(batch -> System.out.println("batched : " + batch))
      .filter(c->!c.isEmpty())
      .forEach(System.out::println);
}

代码示例来源:origin: aol/cyclops

@Test
public void batchByTimex() {
      iterate("", last -> "next")
      .limit(100)
      .peek(next->System.out.println("Counter " +count2.incrementAndGet()))
      .groupedByTime(10, TimeUnit.MICROSECONDS)
      .peek(batch -> System.out.println("batched : " + batch))
      .filter(c->!c.isEmpty())
      .forEach(System.out::println);
}

代码示例来源:origin: aol/cyclops

@Test
public void fixedDelay2() {
  Spouts.range(0, 1000)
      .fixedDelay(1l, TimeUnit.MICROSECONDS).peek(System.out::println)
      .forEach(a->{});
}
@Test

代码示例来源:origin: aol/cyclops

@Test
public void batchBySize() {
  iterate("", last -> "next")
      .limit(100)
      .grouped(10)
      .onePer(1, TimeUnit.MICROSECONDS)
      .peek(batch -> System.out.println("batched : " + batch))
      .concatMap(i->i)
      .peek(individual -> System.out.println("Flattened : "
          + individual))
      .forEach(a->{});
}

代码示例来源:origin: aol/cyclops

@Test
public void batchBySize() {
  iterate("", last -> "next")
      .limit(100)
      .grouped(10)
      .onePer(1, TimeUnit.MICROSECONDS)
      .peek(batch -> System.out.println("batched : " + batch))
      .concatMap(i->i)
      .peek(individual -> System.out.println("Flattened : "
          + individual))
      .forEach(a->{});
}

相关文章

微信公众号

最新文章

更多

ReactiveSeq类方法