akka.stream.javadsl.Flow.create()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(12.8k)|赞(0)|评价(0)|浏览(113)

本文整理了Java中akka.stream.javadsl.Flow.create()方法的一些代码示例,展示了Flow.create()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flow.create()方法的具体详情如下:
包路径:akka.stream.javadsl.Flow
类名称:Flow
方法名:create

Flow.create介绍

暂无

代码示例

代码示例来源:origin: com.typesafe.play/play_2.12

@Override
  public CompletionStage<F.Either<Result, Flow<Message, Message, ?>>> apply(Http.RequestHeader request) {
    return f.apply(request).thenApply(resultOrFlow -> {
      if (resultOrFlow.left.isPresent()) {
        return F.Either.Left(resultOrFlow.left.get());
      } else {
        Flow<Message, Message, ?> flow = AkkaStreams.bypassWith(
            Flow.<Message>create().collect(inMapper),
            play.api.libs.streams.AkkaStreams.onlyFirstCanFinishMerge(2),
            resultOrFlow.right.get().map(outMapper::apply)
        );
        return F.Either.Right(flow);
      }
    });
  }
};

代码示例来源:origin: com.typesafe.play/play

@Override
  public CompletionStage<F.Either<Result, Flow<Message, Message, ?>>> apply(Http.RequestHeader request) {
    return f.apply(request).thenApply(resultOrFlow -> {
      if (resultOrFlow.left.isPresent()) {
        return F.Either.Left(resultOrFlow.left.get());
      } else {
        Flow<Message, Message, ?> flow = AkkaStreams.bypassWith(
            Flow.<Message>create().collect(inMapper),
            play.api.libs.streams.AkkaStreams.onlyFirstCanFinishMerge(2),
            resultOrFlow.right.get().map(outMapper::apply)
        );
        return F.Either.Right(flow);
      }
    });
  }
};

代码示例来源:origin: com.typesafe.play/play_2.11

@Override
  public CompletionStage<F.Either<Result, Flow<Message, Message, ?>>> apply(Http.RequestHeader request) {
    return f.apply(request).thenApply(resultOrFlow -> {
      if (resultOrFlow.left.isPresent()) {
        return F.Either.Left(resultOrFlow.left.get());
      } else {
        Flow<Message, Message, ?> flow = AkkaStreams.bypassWith(
            Flow.<Message>create().collect(inMapper),
            play.api.libs.streams.AkkaStreams.onlyFirstCanFinishMerge(2),
            resultOrFlow.right.get().map(outMapper::apply)
        );
        return F.Either.Right(flow);
      }
    });
  }
};

代码示例来源:origin: com.typesafe.play/play_2.12

/**
 * Bypass the given flow using the given splitter function.
 * <p>
 * If the splitter function returns Left, they will go through the flow.  If it returns Right, they will bypass the
 * flow.
 * <p>
 * Uses onlyFirstCanFinishMerge(2) by default.
 *
 * @param <In>     the In type parameter for Flow
 * @param <FlowIn> the FlowIn type parameter for the left branch in Either.
 * @param <Out>    the Out type parameter for Flow
 * @param flow     the original flow
 * @param splitter the splitter function to use
 *
 * @return the flow with a bypass.
 */
public static <In, FlowIn, Out> Flow<In, Out, ?> bypassWith(Function<In, F.Either<FlowIn, Out>> splitter,
                              Flow<FlowIn, Out, ?> flow) {
  return bypassWith(Flow.<In>create().map(splitter::apply),
      play.api.libs.streams.AkkaStreams.onlyFirstCanFinishMerge(2), flow);
}

代码示例来源:origin: com.typesafe.play/play_2.11

/**
 * Bypass the given flow using the given splitter function.
 * <p>
 * If the splitter function returns Left, they will go through the flow.  If it returns Right, they will bypass the
 * flow.
 * <p>
 * Uses onlyFirstCanFinishMerge(2) by default.
 *
 * @param <In>     the In type parameter for Flow
 * @param <FlowIn> the FlowIn type parameter for the left branch in Either.
 * @param <Out>    the Out type parameter for Flow
 * @param flow     the original flow
 * @param splitter the splitter function to use
 *
 * @return the flow with a bypass.
 */
public static <In, FlowIn, Out> Flow<In, Out, ?> bypassWith(Function<In, F.Either<FlowIn, Out>> splitter,
                              Flow<FlowIn, Out, ?> flow) {
  return bypassWith(Flow.<In>create().map(splitter::apply),
      play.api.libs.streams.AkkaStreams.onlyFirstCanFinishMerge(2), flow);
}

代码示例来源:origin: com.typesafe.play/play

/**
 * Bypass the given flow using the given splitter function.
 * <p>
 * If the splitter function returns Left, they will go through the flow.  If it returns Right, they will bypass the
 * flow.
 * <p>
 * Uses onlyFirstCanFinishMerge(2) by default.
 *
 * @param <In>     the In type parameter for Flow
 * @param <FlowIn> the FlowIn type parameter for the left branch in Either.
 * @param <Out>    the Out type parameter for Flow
 * @param flow     the original flow
 * @param splitter the splitter function to use
 *
 * @return the flow with a bypass.
 */
public static <In, FlowIn, Out> Flow<In, Out, ?> bypassWith(Function<In, F.Either<FlowIn, Out>> splitter,
                              Flow<FlowIn, Out, ?> flow) {
  return bypassWith(Flow.<In>create().map(splitter::apply),
      play.api.libs.streams.AkkaStreams.onlyFirstCanFinishMerge(2), flow);
}

代码示例来源:origin: com.typesafe.play/play_2.12

UniformFanInShape<Out, Out> merge = builder.add(mergeStrategy);
Flow<F.Either<FlowIn, Out>, FlowIn, ?> collectIn = Flow.<F.Either<FlowIn, Out>>create().collect(Scala.partialFunction(x -> {
  if (x.left.isPresent()) {
    return x.left.get();
Flow<F.Either<FlowIn, Out>, Out, ?> collectOut = Flow.<F.Either<FlowIn, Out>>create().collect(Scala.partialFunction(x -> {
  if (x.right.isPresent()) {
    return x.right.get();

代码示例来源:origin: com.typesafe.play/play

UniformFanInShape<Out, Out> merge = builder.add(mergeStrategy);
Flow<F.Either<FlowIn, Out>, FlowIn, ?> collectIn = Flow.<F.Either<FlowIn, Out>>create().collect(Scala.partialFunction(x -> {
  if (x.left.isPresent()) {
    return x.left.get();
Flow<F.Either<FlowIn, Out>, Out, ?> collectOut = Flow.<F.Either<FlowIn, Out>>create().collect(Scala.partialFunction(x -> {
  if (x.right.isPresent()) {
    return x.right.get();

代码示例来源:origin: com.typesafe.play/play_2.11

UniformFanInShape<Out, Out> merge = builder.add(mergeStrategy);
Flow<F.Either<FlowIn, Out>, FlowIn, ?> collectIn = Flow.<F.Either<FlowIn, Out>>create().collect(Scala.partialFunction(x -> {
  if (x.left.isPresent()) {
    return x.left.get();
Flow<F.Either<FlowIn, Out>, Out, ?> collectOut = Flow.<F.Either<FlowIn, Out>>create().collect(Scala.partialFunction(x -> {
  if (x.right.isPresent()) {
    return x.right.get();

代码示例来源:origin: eclipse/ditto

/**
 * Create an activity checker if duration is not null and a pipe otherwise.
 *
 * @param interval how often to check for activity.
 * @param self reference to the actor.
 * @param <A> type of messages that prevents actor termination.
 * @return an activity checker.
 */
public static <A> Graph<FlowShape<A, A>, NotUsed> ofNullable(@Nullable final Duration interval,
    final ActorRef self) {
  return interval == null ? Flow.create() : of(interval, self);
}

代码示例来源:origin: org.eclipse.ditto/ditto-services-utils-akka

/**
 * Create an activity checker if duration is not null and a pipe otherwise.
 *
 * @param interval how often to check for activity.
 * @param self reference to the actor.
 * @param <A> type of messages that prevents actor termination.
 * @return an activity checker.
 */
public static <A> Graph<FlowShape<A, A>, NotUsed> ofNullable(@Nullable final Duration interval,
    final ActorRef self) {
  return interval == null ? Flow.create() : of(interval, self);
}

代码示例来源:origin: org.eclipse.ditto/ditto-services-utils-akka

/**
   * Chain a collection of flows one after another.
   *
   * @param flows collection of flows.
   * @param <A> type of messages through the flows.
   * @return joined flow.
   */
  public static <A> Graph<FlowShape<A, A>, NotUsed> joinFlows(
      final Collection<Graph<FlowShape<A, A>, NotUsed>> flows) {

    Flow<A, A, NotUsed> overallFlow = Flow.create();

    for (Graph<FlowShape<A, A>, NotUsed> flow : flows) {
      overallFlow = overallFlow.via(flow);
    }

    return overallFlow;
  }
}

代码示例来源:origin: eclipse/ditto

/**
   * Chain a collection of flows one after another.
   *
   * @param flows collection of flows.
   * @param <A> type of messages through the flows.
   * @return joined flow.
   */
  public static <A> Graph<FlowShape<A, A>, NotUsed> joinFlows(
      final Collection<Graph<FlowShape<A, A>, NotUsed>> flows) {

    Flow<A, A, NotUsed> overallFlow = Flow.create();

    for (Graph<FlowShape<A, A>, NotUsed> flow : flows) {
      overallFlow = overallFlow.via(flow);
    }

    return overallFlow;
  }
}

代码示例来源:origin: wxyyxc1992/Backend-Boilerplates

/**
 * A handler that treats incoming messages as a name,
 * and responds with a greeting to that name
 */
public static Flow<Message, Message, NotUsed> greeter() {
  return
      Flow.<Message>create()
          .collect(new JavaPartialFunction<Message, Message>() {
            @Override
            public Message apply(Message msg, boolean isCheck) throws Exception {
              if (isCheck) {
                if (msg.isText()) {
                  return null;
                } else {
                  throw noMatch();
                }
              } else {
                return handleTextMessage(msg.asTextMessage());
              }
            }
          });
}

代码示例来源:origin: eclipse/ditto

/**
 * Create an Akka stream graph to dispatch {@code RetrieveThings} and {@code ThingSearchCommand}.
 *
 * @param actorContext context of this actor.
 * @param configReader the configReader for the concierge service.
 * @param pubSubMediator Akka pub-sub mediator.
 * @param enforcerShardRegion shard region of enforcer actors.
 * @return Akka stream graph to dispatch {@code RetrieveThings} and {@code ThingSearchCommand}.
 */
private static Graph<FlowShape<WithSender, WithSender>, NotUsed> dispatchGraph(
    final AbstractActor.ActorContext actorContext,
    final AbstractConciergeConfigReader configReader,
    final ActorRef pubSubMediator,
    final ActorRef enforcerShardRegion) {
  return Flow.<WithSender>create()
      .via(DispatcherActorCreator.dispatchSearchCommands(pubSubMediator))
      .via(DispatcherActorCreator.dispatchRetrieveThings(actorContext, configReader, enforcerShardRegion));
}

代码示例来源:origin: lagom/lagom-recipes

@Inject
 public StreamSubscriber(HelloService helloService, StreamRepository repository) {
  // Create a subscriber
  helloService.helloEvents().subscribe()
   // And subscribe to it with at least once processing semantics.
   .atLeastOnce(
    // Create a flow that emits a Done for each message it processes
    Flow.<HelloEvent>create().mapAsync(1, event -> {

     if (event instanceof HelloEvent.GreetingMessageChanged) {
      HelloEvent.GreetingMessageChanged messageChanged = (HelloEvent.GreetingMessageChanged) event;
      // Update the message
      return repository.updateMessage(messageChanged.getName(), messageChanged.getMessage());

     } else {
      // Ignore all other events
      return CompletableFuture.completedFuture(Done.getInstance());
     }
    })
   );

 }
}

代码示例来源:origin: eclipse/ditto

.with(actorContext, log, enforcerExecutor);
return Flow.<WithSender>create()
    .via(ActivityChecker.ofNullable(activityCheckInterval, actorContext.self()))
    .via(PreEnforcer.fromFunction(actorContext.self(), preEnforcerFunction))

代码示例来源:origin: eclipse/ditto

/**
 * Create Akka actor configuration Props object with pre-enforcer.
 *
 * @param configReader the configReader for the concierge service.
 * @param pubSubMediator Akka pub-sub mediator.
 * @param enforcerShardRegion shard region of enforcer actors.
 * @param preEnforcer the pre-enforcer as graph.
 * @return the Props object.
 */
public static Props props(final AbstractConciergeConfigReader configReader, final ActorRef pubSubMediator,
    final ActorRef enforcerShardRegion,
    final Graph<FlowShape<WithSender, WithSender>, NotUsed> preEnforcer) {
  return GraphActor.partial(actorContext -> {
    DispatcherActorCreator.initActor(actorContext.self(), pubSubMediator);
    return Flow.<WithSender>create()
        .via(preEnforcer)
        .via(DispatcherActorCreator.dispatchGraph(actorContext, configReader, pubSubMediator,
            enforcerShardRegion));
  });
}

代码示例来源:origin: eclipse/ditto

/**
 * Create a processing unit from a function.
 *
 * @param self reference to the actor carrying the pre-enforcement.
 * @param processor function to call.
 * @return Akka stream graph.
 */
static Graph<FlowShape<WithSender, WithSender>, NotUsed> fromFunction(
    @Nullable final ActorRef self,
    final Function<WithDittoHeaders, CompletionStage<WithDittoHeaders>> processor) {
  final Attributes logLevels =
      Attributes.createLogLevels(Logging.DebugLevel(), Logging.DebugLevel(), Logging.ErrorLevel());
  final Flow<WithSender<WithDittoHeaders>, WithSender, NotUsed> flow =
      Flow.<WithSender<WithDittoHeaders>>create()
          .mapAsync(1, wrapped -> {
            final Supplier<CompletionStage<Object>> futureSupplier = () ->
                processor.apply(wrapped.getMessage())
                    .<Object>thenApply(result -> WithSender.of(result, wrapped.getSender()));
            return handleErrorNowOrLater(futureSupplier, wrapped, self);
          })
          .log("PreEnforcer")
          .withAttributes(logLevels)
          .flatMapConcat(PreEnforcer::keepResultAndLogErrors);
  return Pipe.joinUnhandledSink(
      Pipe.joinFilteredFlow(Filter.of(WithDittoHeaders.class), flow), unhandled());
}

代码示例来源:origin: eclipse/ditto

private Sink<Message, NotUsed> createSink(final Integer version, final String connectionCorrelationId,
    final AuthorizationContext connectionAuthContext, final DittoHeaders additionalHeaders,
    final ProtocolAdapter adapter) {
  return Flow.<Message>create()
      .filter(Message::isText)
      .map(Message::asTextMessage)
      .map(textMsg -> {
        if (textMsg.isStrict()) {
          return Source.single(textMsg.getStrictText());
        } else {
          return textMsg.getStreamedText();
        }
      })
      .flatMapConcat(textMsg -> textMsg.<String>fold("", (str1, str2) -> str1 + str2))
      .via(Flow.fromFunction(result -> {
        LogUtil.logWithCorrelationId(LOGGER, connectionCorrelationId, logger ->
            logger.debug("Received incoming WebSocket message: {}", result));
        return result;
      }))
      .withAttributes(Attributes.createLogLevels(Logging.DebugLevel(), Logging.DebugLevel(),
          Logging.WarningLevel()))
      .filter(strictText -> processProtocolMessage(connectionAuthContext, connectionCorrelationId,
          strictText))
      .map(buildSignal(version, connectionCorrelationId, connectionAuthContext, additionalHeaders, adapter))
      .to(Sink.actorSubscriber(
          CommandSubscriber.props(streamingActor, subscriberBackpressureQueueSize, eventStream)));
}

相关文章