akka.stream.javadsl.Source.map()方法的使用及代码示例

x33g5p2x  于2022-01-30 转载在 其他  
字(12.3k)|赞(0)|评价(0)|浏览(95)

本文整理了Java中akka.stream.javadsl.Source.map()方法的一些代码示例,展示了Source.map()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Source.map()方法的具体详情如下:
包路径:akka.stream.javadsl.Source
类名称:Source
方法名:map

Source.map介绍

暂无

代码示例

代码示例来源:origin: com.typesafe.play/play

/**
 * Convert the given source of ByteStrings to a chunked entity.
 *
 * @param data The source.
 * @param contentType The optional content type.
 * @return The ByteStrings.
 */
public static final HttpEntity chunked(Source<ByteString, ?> data, Optional<String> contentType) {
  return new Chunked(data.map(HttpChunk.Chunk::new), contentType);
}

代码示例来源:origin: org.eclipse.ditto/ditto-services-thingsearch-persistence

@Override
public Source<Long, NotUsed> count(final PolicyRestrictedSearchAggregation policyRestrictedSearchAggregation) {
  checkNotNull(policyRestrictedSearchAggregation, "policy restricted aggregation");
  final Source<Document, NotUsed> source = policyRestrictedSearchAggregation.execute(collection, maxQueryTime);
  return source.map(doc -> doc.get(PersistenceConstants.COUNT_RESULT_NAME))
      .map(countResult -> (Number) countResult)
      .map(Number::longValue) // use Number.longValue() to support both Integer and Long values
      .orElse(Source.<Long>single(0L))
      .mapError(handleMongoExecutionTimeExceededException())
      .log("count");
}

代码示例来源:origin: com.typesafe.play/play_2.12

/**
 * Convert the given source of ByteStrings to a chunked entity.
 *
 * @param data The source.
 * @param contentType The optional content type.
 * @return The ByteStrings.
 */
public static final HttpEntity chunked(Source<ByteString, ?> data, Optional<String> contentType) {
  return new Chunked(data.map(HttpChunk.Chunk::new), contentType);
}

代码示例来源:origin: eclipse/ditto

@Override
public Source<Long, NotUsed> count(final PolicyRestrictedSearchAggregation policyRestrictedSearchAggregation) {
  checkNotNull(policyRestrictedSearchAggregation, "policy restricted aggregation");
  final Source<Document, NotUsed> source = policyRestrictedSearchAggregation.execute(collection, maxQueryTime);
  return source.map(doc -> doc.get(PersistenceConstants.COUNT_RESULT_NAME))
      .map(countResult -> (Number) countResult)
      .map(Number::longValue) // use Number.longValue() to support both Integer and Long values
      .orElse(Source.<Long>single(0L))
      .mapError(handleMongoExecutionTimeExceededException())
      .log("count");
}

代码示例来源:origin: com.typesafe.play/play_2.11

/**
 * Convert the given source of ByteStrings to a chunked entity.
 *
 * @param data The source.
 * @param contentType The optional content type.
 * @return The ByteStrings.
 */
public static final HttpEntity chunked(Source<ByteString, ?> data, Optional<String> contentType) {
  return new Chunked(data.map(HttpChunk.Chunk::new), contentType);
}

代码示例来源:origin: com.typesafe.play/play_2.11

public static Source<ByteString, ?> transform(Source<? super Http.MultipartFormData.Part<Source<ByteString, ?>>, ?> parts, String boundary) {
  Source<MultipartFormData.Part<akka.stream.scaladsl.Source<ByteString, ?>>, ?> source = parts.map((part) -> {
    if (part instanceof Http.MultipartFormData.DataPart) {
      Http.MultipartFormData.DataPart dp = (Http.MultipartFormData.DataPart) part;
      return (MultipartFormData.Part) new MultipartFormData.DataPart(dp.getKey(), dp.getValue());
    } else if (part instanceof Http.MultipartFormData.FilePart) {
      Http.MultipartFormData.FilePart fp = (Http.MultipartFormData.FilePart) part;
      if (fp.ref instanceof Source) {
        Source ref = (Source) fp.ref;
        Option<String> ct = Option.apply(fp.getContentType());
        return (MultipartFormData.Part)new MultipartFormData.FilePart<akka.stream.scaladsl.Source<ByteString, ?>>(fp.getKey(), fp.getFilename(), ct, ref.asScala(), fp.getFileSize(), fp.getDispositionType());
      }
    }
    throw new UnsupportedOperationException("Unsupported Part Class");
  });
  return source.via(Multipart.format(boundary, Charset.defaultCharset(), 4096));
}

代码示例来源:origin: com.typesafe.play/play_2.12

public static Source<ByteString, ?> transform(Source<? super Http.MultipartFormData.Part<Source<ByteString, ?>>, ?> parts, String boundary) {
  Source<MultipartFormData.Part<akka.stream.scaladsl.Source<ByteString, ?>>, ?> source = parts.map((part) -> {
    if (part instanceof Http.MultipartFormData.DataPart) {
      Http.MultipartFormData.DataPart dp = (Http.MultipartFormData.DataPart) part;
      return (MultipartFormData.Part) new MultipartFormData.DataPart(dp.getKey(), dp.getValue());
    } else if (part instanceof Http.MultipartFormData.FilePart) {
      Http.MultipartFormData.FilePart fp = (Http.MultipartFormData.FilePart) part;
      if (fp.ref instanceof Source) {
        Source ref = (Source) fp.ref;
        Option<String> ct = Option.apply(fp.getContentType());
        return (MultipartFormData.Part)new MultipartFormData.FilePart<akka.stream.scaladsl.Source<ByteString, ?>>(fp.getKey(), fp.getFilename(), ct, ref.asScala(), fp.getFileSize(), fp.getDispositionType());
      }
    }
    throw new UnsupportedOperationException("Unsupported Part Class");
  });
  return source.via(Multipart.format(boundary, Charset.defaultCharset(), 4096));
}

代码示例来源:origin: com.typesafe.play/play

public static Source<ByteString, ?> transform(Source<? super Http.MultipartFormData.Part<Source<ByteString, ?>>, ?> parts, String boundary) {
  Source<MultipartFormData.Part<akka.stream.scaladsl.Source<ByteString, ?>>, ?> source = parts.map((part) -> {
    if (part instanceof Http.MultipartFormData.DataPart) {
      Http.MultipartFormData.DataPart dp = (Http.MultipartFormData.DataPart) part;
      return (MultipartFormData.Part) new MultipartFormData.DataPart(dp.getKey(), dp.getValue());
    } else if (part instanceof Http.MultipartFormData.FilePart) {
      Http.MultipartFormData.FilePart fp = (Http.MultipartFormData.FilePart) part;
      if (fp.ref instanceof Source) {
        Source ref = (Source) fp.ref;
        Option<String> ct = Option.apply(fp.getContentType());
        return (MultipartFormData.Part)new MultipartFormData.FilePart<akka.stream.scaladsl.Source<ByteString, ?>>(fp.getKey(), fp.getFilename(), ct, ref.asScala(), fp.getFileSize(), fp.getDispositionType());
      }
    }
    throw new UnsupportedOperationException("Unsupported Part Class");
  });
  return source.via(Multipart.format(boundary, Charset.defaultCharset(), 4096));
}

代码示例来源:origin: org.eclipse.ditto/ditto-services-utils-persistence

/**
 * Gets all indices defined on the given collection, excluding the default index defined on "_id".
 *
 * @param collectionName the name of the collection.
 * @return a source which emits the list of found indices.
 */
public Source<List<Index>, NotUsed> getIndicesExceptDefaultIndex(final String collectionName) {
  return getIndices(collectionName)
      .map(indices -> indices.stream()
          .filter(indexInfo -> !DEFAULT_INDEX_NAME.equals(indexInfo.getName()))
          .collect(Collectors.toList()));
}

代码示例来源:origin: com.lightbend.lagom/lagom-javadsl-server

@Override
public ServiceCall<NotUsed, Source<List<CircuitBreakerStatus>, ?>> circuitBreakers() {
 return request -> {
  if (!provider.isPresent())
   throw new NotFound("No metrics");
  Source<List<CircuitBreakerStatus>, ?> source = 
   Source.tick(Duration.ofMillis(100), Duration.ofSeconds(2), "tick")
    .map(tick -> allCircuitBreakerStatus());
  return CompletableFuture.completedFuture(source);
 };
}

代码示例来源:origin: org.eclipse.ditto/ditto-services-thingsearch-updater-actors

private static Source<Object, NotUsed> toPolicyReferenceTags(final PolicyTag policyTag,
      final ThingsSearchUpdaterPersistence searchUpdaterPersistence) {

    return searchUpdaterPersistence.getOutdatedThingIds(policyTag)
        .map(thingId -> PolicyReferenceTag.of(thingId, policyTag));
  }
}

代码示例来源:origin: org.eclipse.ditto/ditto-services-utils-persistence

@Override
public Source<NotUsed, NotUsed> updateLastSuccessfulStreamEnd(final Instant timestamp) {
  final Date mongoStorableDate = Date.from(timestamp);
  final Document toStore = new Document()
      .append(FIELD_TIMESTAMP, mongoStorableDate);
  return Source.fromPublisher(lastSuccessfulSearchSyncCollection.insertOne(toStore))
      .map(success -> {
        LOGGER.debug("Successfully inserted timestamp for search synchronization: <{}>.", timestamp);
        return NotUsed.getInstance();
      });
}

代码示例来源:origin: org.eclipse.ditto/ditto-services-thingsearch-persistence

@Override
public final Source<Set<String>, NotUsed> getThingIdsForPolicy(final String policyId) {
  log.debug("Retrieving Thing ids for policy: <{}>", policyId);
  final Bson filter = eq(FIELD_POLICY_ID, policyId);
  return Source.fromPublisher(collection.find(filter)
      .projection(new BsonDocument(FIELD_ID, new BsonInt32(1))))
      .map(doc -> doc.getString(FIELD_ID))
      .fold(new HashSet<>(), (set, id) -> {
        set.add(id);
        return set;
      });
}

代码示例来源:origin: eclipse/ditto

@Override
public final Source<Set<String>, NotUsed> getThingIdsForPolicy(final String policyId) {
  log.debug("Retrieving Thing ids for policy: <{}>", policyId);
  final Bson filter = eq(FIELD_POLICY_ID, policyId);
  return Source.fromPublisher(collection.find(filter)
      .projection(new BsonDocument(FIELD_ID, new BsonInt32(1))))
      .map(doc -> doc.getString(FIELD_ID))
      .fold(new HashSet<>(), (set, id) -> {
        set.add(id);
        return set;
      });
}

代码示例来源:origin: org.eclipse.ditto/ditto-services-thingsearch-persistence

private Source<Boolean, NotUsed> delete(final String thingId, final Bson filter, final Bson document) {
  return Source.fromPublisher(collection.updateOne(filter, document))
      .flatMapConcat(deleteResult -> {
        if (deleteResult.getMatchedCount() <= 0) {
          return Source.single(Boolean.FALSE);
        }
        final PolicyUpdate deletePolicyEntries = PolicyUpdateFactory.createDeleteThingUpdate(thingId);
        final Bson policyIndexRemoveFilter = deletePolicyEntries.getPolicyIndexRemoveFilter();
        return Source.fromPublisher(policiesCollection.deleteMany(policyIndexRemoveFilter))
            .map(r -> true);
      });
}

代码示例来源:origin: org.eclipse.ditto/ditto-services-thingsearch-persistence

/**
 * {@inheritDoc}
 */
@Override
public final Source<ThingMetadata, NotUsed> getThingMetadata(final String thingId) {
  log.debug("Retrieving Thing Metadata for Thing: <{}>", thingId);
  final Bson filter = eq(FIELD_ID, thingId);
  return Source.fromPublisher(collection.find(filter)
      .projection(Projections.include(FIELD_REVISION, FIELD_POLICY_ID, FIELD_POLICY_REVISION)))
      .map(mapThingMetadataToModel())
      .orElse(defaultThingMetadata());
}

代码示例来源:origin: eclipse/ditto

/**
 * {@inheritDoc}
 */
@Override
public final Source<ThingMetadata, NotUsed> getThingMetadata(final String thingId) {
  log.debug("Retrieving Thing Metadata for Thing: <{}>", thingId);
  final Bson filter = eq(FIELD_ID, thingId);
  return Source.fromPublisher(collection.find(filter)
      .projection(Projections.include(FIELD_REVISION, FIELD_POLICY_ID, FIELD_POLICY_REVISION)))
      .map(mapThingMetadataToModel())
      .orElse(defaultThingMetadata());
}

代码示例来源:origin: org.eclipse.ditto/ditto-services-thingsearch-persistence

/**
 * {@inheritDoc}
 */
@Override
protected final Source<Boolean, NotUsed> save(final Thing thing, final long revision, final long policyRevision) {
  log.debug("Saving Thing with revision <{}> and policy revision <{}>: <{}>", revision, policyRevision, thing);
  final Bson filter =
      filterWithLowerThingRevisionOrLowerPolicyRevision(getThingId(thing), revision, policyRevision);
  final Document document = toUpdate(ThingDocumentMapper.toDocument(thing), revision, policyRevision);
  return Source.fromPublisher(collection.updateOne(filter, document, new UpdateOptions().upsert(true)))
      .map(updateResult -> updateResult.getMatchedCount() > 0 || null != updateResult.getUpsertedId());
}

代码示例来源:origin: eclipse/ditto

/**
 * {@inheritDoc}
 */
@Override
protected final Source<Boolean, NotUsed> save(final Thing thing, final long revision, final long policyRevision) {
  log.debug("Saving Thing with revision <{}> and policy revision <{}>: <{}>", revision, policyRevision, thing);
  final Bson filter =
      filterWithLowerThingRevisionOrLowerPolicyRevision(getThingId(thing), revision, policyRevision);
  final Document document = toUpdate(ThingDocumentMapper.toDocument(thing), revision, policyRevision);
  return Source.fromPublisher(collection.updateOne(filter, document, new UpdateOptions().upsert(true)))
      .map(updateResult -> updateResult.getMatchedCount() > 0 || null != updateResult.getUpsertedId());
}

代码示例来源:origin: org.eclipse.ditto/ditto-services-thingsearch-persistence

@Override
public Source<String, NotUsed> getOutdatedThingIds(final PolicyTag policyTag) {
  log.debug("Retrieving outdated Thing ids with policy tag: <{}>", policyTag);
  final String policyId = policyTag.getId();
  final Bson filter = and(eq(FIELD_POLICY_ID, policyId), lt(FIELD_POLICY_REVISION, policyTag.getRevision()));
  final Publisher<Document> publisher =
      collection.find(filter).projection(new BsonDocument(FIELD_ID, new BsonInt32(1)));
  return Source.fromPublisher(publisher)
      .map(doc -> doc.getString(FIELD_ID));
}

相关文章