akka.stream.javadsl.Flow.via()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(7.4k)|赞(0)|评价(0)|浏览(160)

本文整理了Java中akka.stream.javadsl.Flow.via()方法的一些代码示例,展示了Flow.via()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flow.via()方法的具体详情如下:
包路径:akka.stream.javadsl.Flow
类名称:Flow
方法名:via

Flow.via介绍

暂无

代码示例

代码示例来源:origin: com.typesafe.play/play_2.11

return splitter.via(Flow.fromGraph(GraphDSL.<FlowShape<F.Either<FlowIn, Out>, Out>>create(builder -> {

代码示例来源:origin: com.typesafe.play/play_2.12

return splitter.via(Flow.fromGraph(GraphDSL.<FlowShape<F.Either<FlowIn, Out>, Out>>create(builder -> {

代码示例来源:origin: com.typesafe.play/play

return splitter.via(Flow.fromGraph(GraphDSL.<FlowShape<F.Either<FlowIn, Out>, Out>>create(builder -> {

代码示例来源:origin: org.eclipse.ditto/ditto-services-utils-akka

/**
   * Chain a collection of flows one after another.
   *
   * @param flows collection of flows.
   * @param <A> type of messages through the flows.
   * @return joined flow.
   */
  public static <A> Graph<FlowShape<A, A>, NotUsed> joinFlows(
      final Collection<Graph<FlowShape<A, A>, NotUsed>> flows) {

    Flow<A, A, NotUsed> overallFlow = Flow.create();

    for (Graph<FlowShape<A, A>, NotUsed> flow : flows) {
      overallFlow = overallFlow.via(flow);
    }

    return overallFlow;
  }
}

代码示例来源:origin: eclipse/ditto

/**
   * Chain a collection of flows one after another.
   *
   * @param flows collection of flows.
   * @param <A> type of messages through the flows.
   * @return joined flow.
   */
  public static <A> Graph<FlowShape<A, A>, NotUsed> joinFlows(
      final Collection<Graph<FlowShape<A, A>, NotUsed>> flows) {

    Flow<A, A, NotUsed> overallFlow = Flow.create();

    for (Graph<FlowShape<A, A>, NotUsed> flow : flows) {
      overallFlow = overallFlow.via(flow);
    }

    return overallFlow;
  }
}

代码示例来源:origin: eclipse/ditto

/**
 * Create an Akka stream graph to dispatch {@code RetrieveThings} and {@code ThingSearchCommand}.
 *
 * @param actorContext context of this actor.
 * @param configReader the configReader for the concierge service.
 * @param pubSubMediator Akka pub-sub mediator.
 * @param enforcerShardRegion shard region of enforcer actors.
 * @return Akka stream graph to dispatch {@code RetrieveThings} and {@code ThingSearchCommand}.
 */
private static Graph<FlowShape<WithSender, WithSender>, NotUsed> dispatchGraph(
    final AbstractActor.ActorContext actorContext,
    final AbstractConciergeConfigReader configReader,
    final ActorRef pubSubMediator,
    final ActorRef enforcerShardRegion) {
  return Flow.<WithSender>create()
      .via(DispatcherActorCreator.dispatchSearchCommands(pubSubMediator))
      .via(DispatcherActorCreator.dispatchRetrieveThings(actorContext, configReader, enforcerShardRegion));
}

代码示例来源:origin: eclipse/ditto

/**
 * Create Akka actor configuration Props object with pre-enforcer.
 *
 * @param configReader the configReader for the concierge service.
 * @param pubSubMediator Akka pub-sub mediator.
 * @param enforcerShardRegion shard region of enforcer actors.
 * @param preEnforcer the pre-enforcer as graph.
 * @return the Props object.
 */
public static Props props(final AbstractConciergeConfigReader configReader, final ActorRef pubSubMediator,
    final ActorRef enforcerShardRegion,
    final Graph<FlowShape<WithSender, WithSender>, NotUsed> preEnforcer) {
  return GraphActor.partial(actorContext -> {
    DispatcherActorCreator.initActor(actorContext.self(), pubSubMediator);
    return Flow.<WithSender>create()
        .via(preEnforcer)
        .via(DispatcherActorCreator.dispatchGraph(actorContext, configReader, pubSubMediator,
            enforcerShardRegion));
  });
}

代码示例来源:origin: eclipse/ditto

.via(ActivityChecker.ofNullable(activityCheckInterval, actorContext.self()))
.via(PreEnforcer.fromFunction(actorContext.self(), preEnforcerFunction))
.via(Pipe.joinFlows(enforcementProviders.stream()
    .map(provider -> provider.toGraph(enforcementContext))
    .collect(Collectors.toList())));

代码示例来源:origin: com.typesafe.play/play-java_2.11

/**
 * Produces a Flow of escaped ByteString from a series of String elements.  Calls
 * out to Comet.flow internally.
 *
 * @param callbackName the javascript callback method.
 * @return a flow of ByteString elements.
 */
public static Flow<String, ByteString, NotUsed> string(String callbackName) {
  return Flow.of(String.class).map(str -> {
    return ByteString.fromString("'" + StringEscapeUtils.escapeEcmaScript(str) + "'");
  }).via(flow(callbackName));
}

代码示例来源:origin: com.typesafe.play/play-java

/**
 * Produces a Flow of escaped ByteString from a series of String elements.  Calls
 * out to Comet.flow internally.
 *
 * @param callbackName the javascript callback method.
 * @return a flow of ByteString elements.
 */
public static Flow<String, ByteString, NotUsed> string(String callbackName) {
  return Flow.of(String.class).map(str -> {
    return ByteString.fromString("'" + StringEscapeUtils.escapeEcmaScript(str) + "'");
  }).via(flow(callbackName));
}

代码示例来源:origin: com.typesafe.play/play-java

/**
 * Produces a flow of ByteString using `Json.stringify` from a Flow of JsonNode.  Calls
 * out to Comet.flow internally.
 *
 * @param callbackName the javascript callback method.
 * @return a flow of ByteString elements.
 */
public static Flow<JsonNode, ByteString, NotUsed> json(String callbackName) {
  return Flow.of(JsonNode.class).map(json -> {
    return ByteString.fromString(Json.stringify(json));
  }).via(flow(callbackName));
}

代码示例来源:origin: com.typesafe.play/play-java_2.11

/**
 * Produces a flow of ByteString using `Json.stringify` from a Flow of JsonNode.  Calls
 * out to Comet.flow internally.
 *
 * @param callbackName the javascript callback method.
 * @return a flow of ByteString elements.
 */
public static Flow<JsonNode, ByteString, NotUsed> json(String callbackName) {
  return Flow.of(JsonNode.class).map(json -> {
    return ByteString.fromString(Json.stringify(json));
  }).via(flow(callbackName));
}

代码示例来源:origin: com.typesafe.play/play-java_2.12

/**
 * Produces a Flow of escaped ByteString from a series of String elements.  Calls
 * out to Comet.flow internally.
 *
 * @param callbackName the javascript callback method.
 * @return a flow of ByteString elements.
 */
public static Flow<String, ByteString, NotUsed> string(String callbackName) {
  return Flow.of(String.class).map(str -> {
    return ByteString.fromString("'" + StringEscapeUtils.escapeEcmaScript(str) + "'");
  }).via(flow(callbackName));
}

代码示例来源:origin: com.typesafe.play/play-java_2.12

/**
 * Produces a flow of ByteString using `Json.stringify` from a Flow of JsonNode.  Calls
 * out to Comet.flow internally.
 *
 * @param callbackName the javascript callback method.
 * @return a flow of ByteString elements.
 */
public static Flow<JsonNode, ByteString, NotUsed> json(String callbackName) {
  return Flow.of(JsonNode.class).map(json -> {
    return ByteString.fromString(Json.stringify(json));
  }).via(flow(callbackName));
}

代码示例来源:origin: eclipse/ditto

private Sink<Message, NotUsed> createSink(final Integer version, final String connectionCorrelationId,
    final AuthorizationContext connectionAuthContext, final DittoHeaders additionalHeaders,
    final ProtocolAdapter adapter) {
  return Flow.<Message>create()
      .filter(Message::isText)
      .map(Message::asTextMessage)
      .map(textMsg -> {
        if (textMsg.isStrict()) {
          return Source.single(textMsg.getStrictText());
        } else {
          return textMsg.getStreamedText();
        }
      })
      .flatMapConcat(textMsg -> textMsg.<String>fold("", (str1, str2) -> str1 + str2))
      .via(Flow.fromFunction(result -> {
        LogUtil.logWithCorrelationId(LOGGER, connectionCorrelationId, logger ->
            logger.debug("Received incoming WebSocket message: {}", result));
        return result;
      }))
      .withAttributes(Attributes.createLogLevels(Logging.DebugLevel(), Logging.DebugLevel(),
          Logging.WarningLevel()))
      .filter(strictText -> processProtocolMessage(connectionAuthContext, connectionCorrelationId,
          strictText))
      .map(buildSignal(version, connectionCorrelationId, connectionAuthContext, additionalHeaders, adapter))
      .to(Sink.actorSubscriber(
          CommandSubscriber.props(streamingActor, subscriberBackpressureQueueSize, eventStream)));
}

相关文章