
x33g5p2x  于2022-01-16 转载在 其他  



[英]A sequence of elements supporting sequential and parallel aggregate operations. The following example illustrates an aggregate operation using Stream and IntStream:

int sum =

In this example, widgets is a Collection. We create a stream of Widget objects via Collection#stream, filter it to produce a stream containing only the red widgets, and then transform it into a stream of int values representing the weight of each red widget. Then this stream is summed to produce a total weight.

In addition to Stream, which is a stream of object references, there are primitive specializations for IntStream, LongStream, and DoubleStream, all of which are referred to as "streams" and conform to the characteristics and restrictions described here.

To perform a computation, stream operations are composed into a stream pipeline. A stream pipeline consists of a source (which might be an array, a collection, a generator function, an I/O channel, etc), zero or more intermediate operations (which transform a stream into another stream, such as Stream#filter(Predicate)), and a terminal operation (which produces a result or side-effect, such as Stream#count() or Stream#forEach(Consumer)). Streams are lazy; computation on the source data is only performed when the terminal operation is initiated, and source elements are consumed only as needed.

Collections and streams, while bearing some superficial similarities, have different goals. Collections are primarily concerned with the efficient management of, and access to, their elements. By contrast, streams do not provide a means to directly access or manipulate their elements, and are instead concerned with declaratively describing their source and the computational operations which will be performed in aggregate on that source. However, if the provided stream operations do not offer the desired functionality, the #iterator() and #spliterator() operations can be used to perform a controlled traversal.

A stream pipeline, like the "widgets" example above, can be viewed as a query on the stream source. Unless the source was explicitly designed for concurrent modification (such as a ConcurrentHashMap), unpredictable or erroneous behavior may result from modifying the stream source while it is being queried.

Most stream operations accept parameters that describe user-specified behavior, such as the lambda expression w -> w.getWeight() passed to mapToInt in the example above. To preserve correct behavior, these behavioral parameters:

  • must be non-interfering (they do not modify the stream source); and
  • in most cases must be stateless (their result should not depend on any state that might change during execution of the stream pipeline).

Such parameters are always instances of a functional interface such as java.util.function.Function, and are often lambda expressions or method references. Unless otherwise specified these parameters must be non-null.

A stream should be operated on (invoking an intermediate or terminal stream operation) only once. This rules out, for example, "forked" streams, where the same source feeds two or more pipelines, or multiple traversals of the same stream. A stream implementation may throw IllegalStateExceptionif it detects that the stream is being reused. However, since some stream operations may return their receiver rather than a new stream object, it may not be possible to detect reuse in all cases.

Streams have a #close() method and implement AutoCloseable, but nearly all stream instances do not actually need to be closed after use. Generally, only streams whose source is an IO channel (such as those returned by Files#lines(Path,Charset)) will require closing. Most streams are backed by collections, arrays, or generating functions, which require no special resource management. (If a stream does require closing, it can be declared as a resource in a try-with-resources statement.)

Stream pipelines may execute either sequentially or in parallel. This execution mode is a property of the stream. Streams are created with an initial choice of sequential or parallel execution. (For example, Collection#stream() creates a sequential stream, and Collection#parallelStream() creates a parallel one.) This choice of execution mode may be modified by the #sequential() or #parallel() methods, and may be queried with the #isParallel() method.

int sum =

这些参数总是functional interface的实例,比如java。util。作用函数,并且通常是lambda表达式或方法引用。除非另有规定,否则这些参数必须为

流有一个#close()方法并实现自动关闭,但几乎所有流实例在使用后实际上都不需要关闭。通常,只有源为IO通道的流(例如由文件#行(路径,字符集)返回的流)才需要关闭。大多数流都由集合、数组或生成函数支持,它们不需要特殊的资源管理。(如果流确实需要关闭,则可以在try with resources语句中将其声明为资源。)


canonical example by Tabnine

public List<Integer> findDivisors(int number) {
 return Stream.iterate(1, k -> ++k)
   .filter(k -> number % k == 0)

canonical example by Tabnine

public void printFibonacciSequence(int length) {
   Stream.iterate(new long[] {0, 1}, pair -> new long[] {pair[1], pair[0] + pair[1]})
     .map(pair -> Long.toString(pair[1]))
     .collect(Collectors.joining(", ")));

代码示例来源:origin: spring-projects/spring-framework

private List<SyncHandlerMethodArgumentResolver> initBinderResolvers(
    ArgumentResolverConfigurer customResolvers, ReactiveAdapterRegistry reactiveRegistry,
    ConfigurableApplicationContext context) {
  return initResolvers(customResolvers, reactiveRegistry, context, false, Collections.emptyList()).stream()
      .filter(resolver -> resolver instanceof SyncHandlerMethodArgumentResolver)
      .map(resolver -> (SyncHandlerMethodArgumentResolver) resolver)

代码示例来源:origin: spring-projects/spring-framework

 * Provide the TaskScheduler to use for SockJS endpoints for which a task
 * scheduler has not been explicitly registered. This method must be called
 * prior to {@link #getHandlerMapping()}.
protected void setTaskScheduler(TaskScheduler scheduler) {
      .filter(r -> r.getTaskScheduler() == null)
      .forEach(registration -> registration.setTaskScheduler(scheduler));

代码示例来源:origin: spring-projects/spring-framework

public CorsConfiguration getCorsConfiguration(ServerWebExchange exchange) {
  PathContainer lookupPath = exchange.getRequest().getPath().pathWithinApplication();
  return this.corsConfigurations.entrySet().stream()
      .filter(entry -> entry.getKey().matches(lookupPath))

代码示例来源:origin: spring-projects/spring-framework

 * Match handlers declared under a base package, e.g. "org.example".
 * @param packages one or more base package classes
public Builder basePackage(String... packages) {;
  return this;

代码示例来源:origin: spring-projects/spring-framework

private static MediaType initDefaultMediaType(List<MediaType> mediaTypes) {

代码示例来源:origin: prestodb/presto

public List<Symbol> getOriginalNonDistinctAggregateArgs()
  return aggregations.values().stream()
      .filter(aggregation -> !aggregation.getMask().isPresent())
      .flatMap(function -> function.getArguments().stream())

代码示例来源:origin: spring-projects/spring-framework

private static List<PathPattern> parse(String[] paths, PathPatternParser parser) {
    return Arrays
        .map(path -> {
          if (StringUtils.hasText(path) && !path.startsWith("/")) {
            path = "/" + path;
          return parser.parse(path);

代码示例来源:origin: prestodb/presto

public Map<String, PartitionStatistics> getPartitionStatistics(String databaseName, String tableName, Set<String> partitionNames)
  List<HivePartitionName> partitions =
      .map(partitionName -> HivePartitionName.hivePartitionName(databaseName, tableName, partitionName))
  Map<HivePartitionName, PartitionStatistics> statistics = getAll(partitionStatisticsCache, partitions);
  return statistics.entrySet()
      .collect(toImmutableMap(entry -> entry.getKey().getPartitionName().get(), Entry::getValue));

代码示例来源:origin: apache/incubator-dubbo

public synchronized void notify(List<URL> urls) {
  List<URL> categoryUrls =
   * TODO Try to refactor the processing of these three type of urls using Collectors.groupBy()?
  this.configurators = Configurator.toConfigurators(classifyUrls(categoryUrls, UrlUtils::isConfigurator))
  toRouters(classifyUrls(categoryUrls, UrlUtils::isRoute)).ifPresent(this::addRouters);
  // providers
  refreshOverrideAndInvoker(classifyUrls(categoryUrls, UrlUtils::isProvider));

代码示例来源:origin: google/guava

public void testToOptionalNull() {
 Stream<Object> stream = Stream.of((Object) null);
 try {
  fail("Expected NullPointerException");
 } catch (NullPointerException expected) {

代码示例来源:origin: prestodb/presto

public static RowType anonymous(List<Type> types)
  List<Field> fields =
      .map(type -> new Field(Optional.empty(), type))
  return new RowType(makeSignature(fields), fields);

代码示例来源:origin: prestodb/presto

private synchronized Collection<HivePrivilegeInfo> getTablePrivileges(
    Path permissionsDirectory,
    String principalName,
    PrincipalType principalType)
  Path permissionFilePath = getPermissionsPath(permissionsDirectory, principalName, principalType);
  return readFile("permissions", permissionFilePath, permissionsCodec).orElse(ImmutableList.of()).stream()

代码示例来源:origin: prestodb/presto

private static PlanFragment createExchangePlanFragment(String name, PlanFragment... fragments)
  PlanNode planNode = new RemoteSourceNode(
      new PlanNodeId(name + "_id"),
  return createFragment(planNode);

代码示例来源:origin: prestodb/presto

public Supplier<PageProcessor> createPageProcessor(Page filterTuple, OptionalInt initialBatchSize)
  TuplePageFilter filter = new TuplePageFilter(filterTuple, filterTypes, outputFilterChannels);
  return () -> new PageProcessor(
          .collect(toImmutableList()), initialBatchSize);

代码示例来源:origin: prestodb/presto

private Set<NullableValue> filterValues(Set<NullableValue> nullableValues, TpchColumn<?> column, Constraint<ColumnHandle> constraint)
      .filter(convertToPredicate(constraint.getSummary(), toColumnHandle(column)))
      .filter(value -> !constraint.predicate().isPresent() || constraint.predicate().get().test(ImmutableMap.of(toColumnHandle(column), value)))

代码示例来源:origin: spring-projects/spring-framework

private List<MediaType> getMediaTypesFor(ResolvableType elementType) {
  return getMessageWriters().stream()
      .filter(converter -> converter.canWrite(elementType, null))
      .flatMap(converter -> converter.getWritableMediaTypes().stream())

代码示例来源:origin: prestodb/presto

public List<SchemaTableName> listTables(Optional<String> schemaName)
  return tableDescriptions.getAllSchemaTableNames()
      .filter(schemaTableName -> !schemaName.isPresent() || schemaTableName.getSchemaName().equals(schemaName.get()))

代码示例来源:origin: prestodb/presto

private static TypeSignature makeSignature(List<Field> fields)
  int size = fields.size();
  if (size == 0) {
    throw new IllegalArgumentException("Row type must have at least 1 field");
  List<TypeSignatureParameter> parameters =
      .map(field -> TypeSignatureParameter.of(new NamedTypeSignature(field.getName().map(name -> new RowFieldName(name, false)), field.getType().getTypeSignature())))
  return new TypeSignature(ROW, parameters);
