akka.stream.javadsl.Source.fold()方法的使用及代码示例

x33g5p2x  于2022-01-30 转载在 其他  
字(13.4k)|赞(0)|评价(0)|浏览(106)

本文整理了Java中akka.stream.javadsl.Source.fold()方法的一些代码示例,展示了Source.fold()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Source.fold()方法的具体详情如下:
包路径:akka.stream.javadsl.Source
类名称:Source
方法名:fold

Source.fold介绍

暂无

代码示例

代码示例来源:origin: eclipse/ditto

/**
 * Gets all indices defined on the given collection, including the default index defined on "_id".
 *
 * @param collectionName the name of the collection.
 * @return a source which emits the list of found indices.
 */
public Source<List<Index>, NotUsed> getIndices(final String collectionName) {
  return Source.fromPublisher(getCollection(collectionName).listIndexes())
      .map(Index::indexInfoOf)
      .fold(new ArrayList<Index>(), (aggregate, element) -> {
        aggregate.add(element);
        return aggregate;
      });
}

代码示例来源:origin: org.eclipse.ditto/ditto-services-utils-persistence

/**
 * Gets all indices defined on the given collection, including the default index defined on "_id".
 *
 * @param collectionName the name of the collection.
 * @return a source which emits the list of found indices.
 */
public Source<List<Index>, NotUsed> getIndices(final String collectionName) {
  return Source.fromPublisher(getCollection(collectionName).listIndexes())
      .map(Index::indexInfoOf)
      .fold(new ArrayList<Index>(), (aggregate, element) -> {
        aggregate.add(element);
        return aggregate;
      });
}

代码示例来源:origin: org.eclipse.ditto/ditto-services-thingsearch-persistence

@Override
public Source<SearchNamespaceReportResult, NotUsed> generateNamespaceCountReport() {
  final AggregatePublisher<Document> aggregatePublisher = collection.aggregate(
      Collections.singletonList(
          new Document("$group",
              new Document(PersistenceConstants.FIELD_ID, "$_namespace")
                  .append(PersistenceConstants.FIELD_COUNT, new Document("$sum", 1))
          )
      )
  );
  return Source.fromPublisher(aggregatePublisher)
      .map(document -> {
        final String namespace = (document.get(PersistenceConstants.FIELD_ID) != null)
            ? document.get(PersistenceConstants.FIELD_ID).toString()
            : "NOT_MIGRATED";
        final long count = Long.parseLong(document.get(PersistenceConstants.FIELD_COUNT).toString());
        return new SearchNamespaceResultEntry(namespace, count);
      })
      .fold(new ArrayList<SearchNamespaceResultEntry>(), (list, entry) -> {
        list.add(entry);
        return list;
      })
      .<SearchNamespaceReportResult>map(SearchNamespaceReportResult::new);
}

代码示例来源:origin: eclipse/ditto

@Override
public Source<SearchNamespaceReportResult, NotUsed> generateNamespaceCountReport() {
  final AggregatePublisher<Document> aggregatePublisher = collection.aggregate(
      Collections.singletonList(
          new Document("$group",
              new Document(PersistenceConstants.FIELD_ID, "$_namespace")
                  .append(PersistenceConstants.FIELD_COUNT, new Document("$sum", 1))
          )
      )
  );
  return Source.fromPublisher(aggregatePublisher)
      .map(document -> {
        final String namespace = (document.get(PersistenceConstants.FIELD_ID) != null)
            ? document.get(PersistenceConstants.FIELD_ID).toString()
            : "NOT_MIGRATED";
        final long count = Long.parseLong(document.get(PersistenceConstants.FIELD_COUNT).toString());
        return new SearchNamespaceResultEntry(namespace, count);
      })
      .fold(new ArrayList<SearchNamespaceResultEntry>(), (list, entry) -> {
        list.add(entry);
        return list;
      })
      .<SearchNamespaceReportResult>map(SearchNamespaceReportResult::new);
}

代码示例来源:origin: org.eclipse.ditto/ditto-services-gateway-util

/**
 * Unfolds the entity into a string using akka streams.
 *
 * @param entity the entity to unfold.
 * @param materializer the materializer to run the unfold stream.
 * @return the string representation of the entity.
 */
public static String entityToString(final HttpEntity entity, final Materializer materializer) {
  requireNonNull(entity);
  requireNonNull(materializer);
  if (entity.isKnownEmpty()) {
    return "";
  }
  try {
    return entity.getDataBytes()
        .fold(ByteString.empty(), ByteString::concat)
        .map(ByteString::utf8String)
        .runWith(Sink.head(), materializer)
        .toCompletableFuture()
        .get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
  } catch (final InterruptedException | ExecutionException | TimeoutException e) {
    throw new IllegalStateException("Failed to read entity.", e);
  }
}

代码示例来源:origin: eclipse/ditto

/**
 * Unfolds the entity into a string using akka streams.
 *
 * @param entity the entity to unfold.
 * @param materializer the materializer to run the unfold stream.
 * @return the string representation of the entity.
 */
public static String entityToString(final HttpEntity entity, final Materializer materializer) {
  requireNonNull(entity);
  requireNonNull(materializer);
  if (entity.isKnownEmpty()) {
    return "";
  }
  try {
    return entity.getDataBytes()
        .fold(ByteString.empty(), ByteString::concat)
        .map(ByteString::utf8String)
        .runWith(Sink.head(), materializer)
        .toCompletableFuture()
        .get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
  } catch (final InterruptedException | ExecutionException | TimeoutException e) {
    throw new IllegalStateException("Failed to read entity.", e);
  }
}

代码示例来源:origin: eclipse/ditto

private CompletableFuture<JsonArray> mapResponseToJsonArray(final HttpResponse response) {
  final CompletionStage<JsonObject> body =
      response.entity().getDataBytes().fold(ByteString.empty(), ByteString::concat)
          .map(ByteString::utf8String)
          .map(JsonFactory::readFrom)
          .map(JsonValue::asObject)
          .runWith(Sink.head(), httpClient.getActorMaterializer());
  final JsonPointer keysPointer = JsonPointer.of("keys");
  return body.toCompletableFuture()
      .thenApply(jsonObject -> jsonObject.getValue(keysPointer).map(JsonValue::asArray)
          .orElseThrow(() -> new JsonMissingFieldException(keysPointer)))
      .exceptionally(t -> {
        throw new IllegalStateException("Failed to extract public keys from JSON response: " + body, t);
      });
}

代码示例来源:origin: org.eclipse.ditto/ditto-services-thingsearch-persistence

@Override
public final Source<Set<String>, NotUsed> getThingIdsForPolicy(final String policyId) {
  log.debug("Retrieving Thing ids for policy: <{}>", policyId);
  final Bson filter = eq(FIELD_POLICY_ID, policyId);
  return Source.fromPublisher(collection.find(filter)
      .projection(new BsonDocument(FIELD_ID, new BsonInt32(1))))
      .map(doc -> doc.getString(FIELD_ID))
      .fold(new HashSet<>(), (set, id) -> {
        set.add(id);
        return set;
      });
}

代码示例来源:origin: eclipse/ditto

@Override
public final Source<Set<String>, NotUsed> getThingIdsForPolicy(final String policyId) {
  log.debug("Retrieving Thing ids for policy: <{}>", policyId);
  final Bson filter = eq(FIELD_POLICY_ID, policyId);
  return Source.fromPublisher(collection.find(filter)
      .projection(new BsonDocument(FIELD_ID, new BsonInt32(1))))
      .map(doc -> doc.getString(FIELD_ID))
      .fold(new HashSet<>(), (set, id) -> {
        set.add(id);
        return set;
      });
}

代码示例来源:origin: eclipse/ditto

private Route handleDevOpsPerRequest(final RequestContext ctx,
    final Source<ByteString, ?> payloadSource,
    final Function<String, DevOpsCommand> requestJsonToCommandFunction) {
  final CompletableFuture<HttpResponse> httpResponseFuture = new CompletableFuture<>();
  payloadSource
      .fold(ByteString.empty(), ByteString::concat)
      .map(ByteString::utf8String)
      .map(requestJsonToCommandFunction)
      .to(Sink.actorRef(createHttpPerRequestActor(ctx, httpResponseFuture),
          HttpRequestActor.COMPLETE_MESSAGE))
      .run(materializer);
  return completeWithFuture(httpResponseFuture);
}

代码示例来源:origin: eclipse/ditto

@Override
public Source<ResultList<String>, NotUsed> findAll(final PolicyRestrictedSearchAggregation aggregation) {
  checkNotNull(aggregation, "aggregation");
  final Source<Document, NotUsed> source = aggregation.execute(collection, maxQueryTime);
  return source.map(doc -> doc.getString(PersistenceConstants.FIELD_ID))
      .fold(new ArrayList<String>(), (list, id) -> {
        list.add(id);
        return list;
      })
      .map(resultsPlus0ne -> toResultList(resultsPlus0ne, aggregation.getSkip(), aggregation.getLimit()))
      .mapError(handleMongoExecutionTimeExceededException())
      .log("findAll");
}

代码示例来源:origin: org.eclipse.ditto/ditto-services-thingsearch-persistence

@Override
public Source<ResultList<String>, NotUsed> findAll(final PolicyRestrictedSearchAggregation aggregation) {
  checkNotNull(aggregation, "aggregation");
  final Source<Document, NotUsed> source = aggregation.execute(collection, maxQueryTime);
  return source.map(doc -> doc.getString(PersistenceConstants.FIELD_ID))
      .fold(new ArrayList<String>(), (list, id) -> {
        list.add(id);
        return list;
      })
      .map(resultsPlus0ne -> toResultList(resultsPlus0ne, aggregation.getSkip(), aggregation.getLimit()))
      .mapError(handleMongoExecutionTimeExceededException())
      .log("findAll");
}

代码示例来源:origin: eclipse/ditto

private Route handleMessage(final RequestContext ctx, final Source<ByteString, Object> payloadSource,
    final Function<ByteBuffer, MessageCommand<?, ?>> requestPayloadToCommandFunction) {
  final CompletableFuture<HttpResponse> httpResponseFuture = new CompletableFuture<>();
  payloadSource.fold(ByteString.empty(), ByteString::concat)
      .map(ByteString::toArray)
      .map(ByteBuffer::wrap)
      .map(requestPayloadToCommandFunction)
      .to(Sink.actorRef(createHttpPerRequestActor(ctx, httpResponseFuture),
          HttpRequestActor.COMPLETE_MESSAGE))
      .run(materializer);
  return completeWithFuture(preprocessResponse(httpResponseFuture));
}

代码示例来源:origin: eclipse/ditto

@Override
public Source<ResultList<String>, NotUsed> findAll(final Query query) {
  checkNotNull(query, "query");
  final BsonDocument queryFilter = getMongoFilter(query);
  if (log.isDebugEnabled()) {
    log.debug("findAll with query filter <{}>.", queryFilter);
  }
  final Bson filter = and(filterNotDeleted(), queryFilter);
  final Optional<Bson> sortOptions = Optional.of(getMongoSort(query));
  final int limit = query.getLimit();
  final int skip = query.getSkip();
  final Bson projection = new Document(PersistenceConstants.FIELD_ID, 1);
  return Source.fromPublisher(collection.find(filter, Document.class)
      .sort(sortOptions.orElse(null))
      .limit(limit + 1)
      .skip(skip)
      .projection(projection)
      .maxTime(maxQueryTime.getSeconds(), TimeUnit.SECONDS)
  )
      .map(doc -> doc.getString(PersistenceConstants.FIELD_ID))
      .fold(new ArrayList<String>(), (list, id) -> {
        list.add(id);
        return list;
      })
      .map(resultsPlus0ne -> toResultList(resultsPlus0ne, skip, limit))
      .mapError(handleMongoExecutionTimeExceededException())
      .log("findAll");
}

代码示例来源:origin: org.eclipse.ditto/ditto-services-thingsearch-persistence

@Override
public Source<ResultList<String>, NotUsed> findAll(final Query query) {
  checkNotNull(query, "query");
  final BsonDocument queryFilter = getMongoFilter(query);
  if (log.isDebugEnabled()) {
    log.debug("findAll with query filter <{}>.", queryFilter);
  }
  final Bson filter = and(filterNotDeleted(), queryFilter);
  final Optional<Bson> sortOptions = Optional.of(getMongoSort(query));
  final int limit = query.getLimit();
  final int skip = query.getSkip();
  final Bson projection = new Document(PersistenceConstants.FIELD_ID, 1);
  return Source.fromPublisher(collection.find(filter, Document.class)
      .sort(sortOptions.orElse(null))
      .limit(limit + 1)
      .skip(skip)
      .projection(projection)
      .maxTime(maxQueryTime.getSeconds(), TimeUnit.SECONDS)
  )
      .map(doc -> doc.getString(PersistenceConstants.FIELD_ID))
      .fold(new ArrayList<String>(), (list, id) -> {
        list.add(id);
        return list;
      })
      .map(resultsPlus0ne -> toResultList(resultsPlus0ne, skip, limit))
      .mapError(handleMongoExecutionTimeExceededException())
      .log("findAll");
}

代码示例来源:origin: eclipse/ditto

private Route handleSudoCountThingsPerRequest(final RequestContext ctx, final SudoCountThings command) {
  final CompletableFuture<HttpResponse> httpResponseFuture = new CompletableFuture<>();
  Source.single(command)
      .to(Sink.actorRef(createHttpPerRequestActor(ctx, httpResponseFuture),
          HttpRequestActor.COMPLETE_MESSAGE))
      .run(materializer);
  final CompletionStage<HttpResponse> allThingsCountHttpResponse = Source.fromCompletionStage(httpResponseFuture)
      .flatMapConcat(httpResponse -> httpResponse.entity().getDataBytes())
      .fold(ByteString.empty(), ByteString::concat)
      .map(ByteString::utf8String)
      .map(Integer::valueOf)
      .map(count -> JsonObject.newBuilder().set("allThingsCount", count).build())
      .map(jsonObject -> HttpResponse.create()
          .withEntity(ContentTypes.APPLICATION_JSON, ByteString.fromString(jsonObject.toString()))
          .withStatus(HttpStatusCode.OK.toInt()))
      .runWith(Sink.head(), materializer);
  return completeWithFuture(allThingsCountHttpResponse);
}

代码示例来源:origin: eclipse/ditto

.fold(ByteString.empty(), ByteString::concat)
.map(ByteString::utf8String)
.map(requestJsonToCommandFunction)
  final InputStream inputStream = response.entity()
      .getDataBytes()
      .fold(ByteString.empty(), ByteString::concat)
      .runWith(StreamConverters.asInputStream(), materializer);
  final JsonValue jsonValue = JsonFactory.readFrom(new InputStreamReader(inputStream));

代码示例来源:origin: eclipse/ditto

private Sink<Message, NotUsed> createSink(final Integer version, final String connectionCorrelationId,
    final AuthorizationContext connectionAuthContext, final DittoHeaders additionalHeaders,
    final ProtocolAdapter adapter) {
  return Flow.<Message>create()
      .filter(Message::isText)
      .map(Message::asTextMessage)
      .map(textMsg -> {
        if (textMsg.isStrict()) {
          return Source.single(textMsg.getStrictText());
        } else {
          return textMsg.getStreamedText();
        }
      })
      .flatMapConcat(textMsg -> textMsg.<String>fold("", (str1, str2) -> str1 + str2))
      .via(Flow.fromFunction(result -> {
        LogUtil.logWithCorrelationId(LOGGER, connectionCorrelationId, logger ->
            logger.debug("Received incoming WebSocket message: {}", result));
        return result;
      }))
      .withAttributes(Attributes.createLogLevels(Logging.DebugLevel(), Logging.DebugLevel(),
          Logging.WarningLevel()))
      .filter(strictText -> processProtocolMessage(connectionAuthContext, connectionCorrelationId,
          strictText))
      .map(buildSignal(version, connectionCorrelationId, connectionAuthContext, additionalHeaders, adapter))
      .to(Sink.actorSubscriber(
          CommandSubscriber.props(streamingActor, subscriberBackpressureQueueSize, eventStream)));
}

相关文章