本文整理了Java中akka.stream.javadsl.Flow.create()
方法的一些代码示例,展示了Flow.create()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flow.create()
方法的具体详情如下:
包路径:akka.stream.javadsl.Flow
类名称:Flow
方法名:create
暂无
代码示例来源:origin: com.typesafe.play/play_2.12
@Override
public CompletionStage<F.Either<Result, Flow<Message, Message, ?>>> apply(Http.RequestHeader request) {
return f.apply(request).thenApply(resultOrFlow -> {
if (resultOrFlow.left.isPresent()) {
return F.Either.Left(resultOrFlow.left.get());
} else {
Flow<Message, Message, ?> flow = AkkaStreams.bypassWith(
Flow.<Message>create().collect(inMapper),
play.api.libs.streams.AkkaStreams.onlyFirstCanFinishMerge(2),
resultOrFlow.right.get().map(outMapper::apply)
);
return F.Either.Right(flow);
}
});
}
};
代码示例来源:origin: com.typesafe.play/play
@Override
public CompletionStage<F.Either<Result, Flow<Message, Message, ?>>> apply(Http.RequestHeader request) {
return f.apply(request).thenApply(resultOrFlow -> {
if (resultOrFlow.left.isPresent()) {
return F.Either.Left(resultOrFlow.left.get());
} else {
Flow<Message, Message, ?> flow = AkkaStreams.bypassWith(
Flow.<Message>create().collect(inMapper),
play.api.libs.streams.AkkaStreams.onlyFirstCanFinishMerge(2),
resultOrFlow.right.get().map(outMapper::apply)
);
return F.Either.Right(flow);
}
});
}
};
代码示例来源:origin: com.typesafe.play/play_2.11
@Override
public CompletionStage<F.Either<Result, Flow<Message, Message, ?>>> apply(Http.RequestHeader request) {
return f.apply(request).thenApply(resultOrFlow -> {
if (resultOrFlow.left.isPresent()) {
return F.Either.Left(resultOrFlow.left.get());
} else {
Flow<Message, Message, ?> flow = AkkaStreams.bypassWith(
Flow.<Message>create().collect(inMapper),
play.api.libs.streams.AkkaStreams.onlyFirstCanFinishMerge(2),
resultOrFlow.right.get().map(outMapper::apply)
);
return F.Either.Right(flow);
}
});
}
};
代码示例来源:origin: com.typesafe.play/play_2.12
/**
* Bypass the given flow using the given splitter function.
* <p>
* If the splitter function returns Left, they will go through the flow. If it returns Right, they will bypass the
* flow.
* <p>
* Uses onlyFirstCanFinishMerge(2) by default.
*
* @param <In> the In type parameter for Flow
* @param <FlowIn> the FlowIn type parameter for the left branch in Either.
* @param <Out> the Out type parameter for Flow
* @param flow the original flow
* @param splitter the splitter function to use
*
* @return the flow with a bypass.
*/
public static <In, FlowIn, Out> Flow<In, Out, ?> bypassWith(Function<In, F.Either<FlowIn, Out>> splitter,
Flow<FlowIn, Out, ?> flow) {
return bypassWith(Flow.<In>create().map(splitter::apply),
play.api.libs.streams.AkkaStreams.onlyFirstCanFinishMerge(2), flow);
}
代码示例来源:origin: com.typesafe.play/play_2.11
/**
* Bypass the given flow using the given splitter function.
* <p>
* If the splitter function returns Left, they will go through the flow. If it returns Right, they will bypass the
* flow.
* <p>
* Uses onlyFirstCanFinishMerge(2) by default.
*
* @param <In> the In type parameter for Flow
* @param <FlowIn> the FlowIn type parameter for the left branch in Either.
* @param <Out> the Out type parameter for Flow
* @param flow the original flow
* @param splitter the splitter function to use
*
* @return the flow with a bypass.
*/
public static <In, FlowIn, Out> Flow<In, Out, ?> bypassWith(Function<In, F.Either<FlowIn, Out>> splitter,
Flow<FlowIn, Out, ?> flow) {
return bypassWith(Flow.<In>create().map(splitter::apply),
play.api.libs.streams.AkkaStreams.onlyFirstCanFinishMerge(2), flow);
}
代码示例来源:origin: com.typesafe.play/play
/**
* Bypass the given flow using the given splitter function.
* <p>
* If the splitter function returns Left, they will go through the flow. If it returns Right, they will bypass the
* flow.
* <p>
* Uses onlyFirstCanFinishMerge(2) by default.
*
* @param <In> the In type parameter for Flow
* @param <FlowIn> the FlowIn type parameter for the left branch in Either.
* @param <Out> the Out type parameter for Flow
* @param flow the original flow
* @param splitter the splitter function to use
*
* @return the flow with a bypass.
*/
public static <In, FlowIn, Out> Flow<In, Out, ?> bypassWith(Function<In, F.Either<FlowIn, Out>> splitter,
Flow<FlowIn, Out, ?> flow) {
return bypassWith(Flow.<In>create().map(splitter::apply),
play.api.libs.streams.AkkaStreams.onlyFirstCanFinishMerge(2), flow);
}
代码示例来源:origin: com.typesafe.play/play_2.12
UniformFanInShape<Out, Out> merge = builder.add(mergeStrategy);
Flow<F.Either<FlowIn, Out>, FlowIn, ?> collectIn = Flow.<F.Either<FlowIn, Out>>create().collect(Scala.partialFunction(x -> {
if (x.left.isPresent()) {
return x.left.get();
Flow<F.Either<FlowIn, Out>, Out, ?> collectOut = Flow.<F.Either<FlowIn, Out>>create().collect(Scala.partialFunction(x -> {
if (x.right.isPresent()) {
return x.right.get();
代码示例来源:origin: com.typesafe.play/play
UniformFanInShape<Out, Out> merge = builder.add(mergeStrategy);
Flow<F.Either<FlowIn, Out>, FlowIn, ?> collectIn = Flow.<F.Either<FlowIn, Out>>create().collect(Scala.partialFunction(x -> {
if (x.left.isPresent()) {
return x.left.get();
Flow<F.Either<FlowIn, Out>, Out, ?> collectOut = Flow.<F.Either<FlowIn, Out>>create().collect(Scala.partialFunction(x -> {
if (x.right.isPresent()) {
return x.right.get();
代码示例来源:origin: com.typesafe.play/play_2.11
UniformFanInShape<Out, Out> merge = builder.add(mergeStrategy);
Flow<F.Either<FlowIn, Out>, FlowIn, ?> collectIn = Flow.<F.Either<FlowIn, Out>>create().collect(Scala.partialFunction(x -> {
if (x.left.isPresent()) {
return x.left.get();
Flow<F.Either<FlowIn, Out>, Out, ?> collectOut = Flow.<F.Either<FlowIn, Out>>create().collect(Scala.partialFunction(x -> {
if (x.right.isPresent()) {
return x.right.get();
代码示例来源:origin: eclipse/ditto
/**
* Create an activity checker if duration is not null and a pipe otherwise.
*
* @param interval how often to check for activity.
* @param self reference to the actor.
* @param <A> type of messages that prevents actor termination.
* @return an activity checker.
*/
public static <A> Graph<FlowShape<A, A>, NotUsed> ofNullable(@Nullable final Duration interval,
final ActorRef self) {
return interval == null ? Flow.create() : of(interval, self);
}
代码示例来源:origin: org.eclipse.ditto/ditto-services-utils-akka
/**
* Create an activity checker if duration is not null and a pipe otherwise.
*
* @param interval how often to check for activity.
* @param self reference to the actor.
* @param <A> type of messages that prevents actor termination.
* @return an activity checker.
*/
public static <A> Graph<FlowShape<A, A>, NotUsed> ofNullable(@Nullable final Duration interval,
final ActorRef self) {
return interval == null ? Flow.create() : of(interval, self);
}
代码示例来源:origin: org.eclipse.ditto/ditto-services-utils-akka
/**
* Chain a collection of flows one after another.
*
* @param flows collection of flows.
* @param <A> type of messages through the flows.
* @return joined flow.
*/
public static <A> Graph<FlowShape<A, A>, NotUsed> joinFlows(
final Collection<Graph<FlowShape<A, A>, NotUsed>> flows) {
Flow<A, A, NotUsed> overallFlow = Flow.create();
for (Graph<FlowShape<A, A>, NotUsed> flow : flows) {
overallFlow = overallFlow.via(flow);
}
return overallFlow;
}
}
代码示例来源:origin: eclipse/ditto
/**
* Chain a collection of flows one after another.
*
* @param flows collection of flows.
* @param <A> type of messages through the flows.
* @return joined flow.
*/
public static <A> Graph<FlowShape<A, A>, NotUsed> joinFlows(
final Collection<Graph<FlowShape<A, A>, NotUsed>> flows) {
Flow<A, A, NotUsed> overallFlow = Flow.create();
for (Graph<FlowShape<A, A>, NotUsed> flow : flows) {
overallFlow = overallFlow.via(flow);
}
return overallFlow;
}
}
代码示例来源:origin: wxyyxc1992/Backend-Boilerplates
/**
* A handler that treats incoming messages as a name,
* and responds with a greeting to that name
*/
public static Flow<Message, Message, NotUsed> greeter() {
return
Flow.<Message>create()
.collect(new JavaPartialFunction<Message, Message>() {
@Override
public Message apply(Message msg, boolean isCheck) throws Exception {
if (isCheck) {
if (msg.isText()) {
return null;
} else {
throw noMatch();
}
} else {
return handleTextMessage(msg.asTextMessage());
}
}
});
}
代码示例来源:origin: eclipse/ditto
/**
* Create an Akka stream graph to dispatch {@code RetrieveThings} and {@code ThingSearchCommand}.
*
* @param actorContext context of this actor.
* @param configReader the configReader for the concierge service.
* @param pubSubMediator Akka pub-sub mediator.
* @param enforcerShardRegion shard region of enforcer actors.
* @return Akka stream graph to dispatch {@code RetrieveThings} and {@code ThingSearchCommand}.
*/
private static Graph<FlowShape<WithSender, WithSender>, NotUsed> dispatchGraph(
final AbstractActor.ActorContext actorContext,
final AbstractConciergeConfigReader configReader,
final ActorRef pubSubMediator,
final ActorRef enforcerShardRegion) {
return Flow.<WithSender>create()
.via(DispatcherActorCreator.dispatchSearchCommands(pubSubMediator))
.via(DispatcherActorCreator.dispatchRetrieveThings(actorContext, configReader, enforcerShardRegion));
}
代码示例来源:origin: lagom/lagom-recipes
@Inject
public StreamSubscriber(HelloService helloService, StreamRepository repository) {
// Create a subscriber
helloService.helloEvents().subscribe()
// And subscribe to it with at least once processing semantics.
.atLeastOnce(
// Create a flow that emits a Done for each message it processes
Flow.<HelloEvent>create().mapAsync(1, event -> {
if (event instanceof HelloEvent.GreetingMessageChanged) {
HelloEvent.GreetingMessageChanged messageChanged = (HelloEvent.GreetingMessageChanged) event;
// Update the message
return repository.updateMessage(messageChanged.getName(), messageChanged.getMessage());
} else {
// Ignore all other events
return CompletableFuture.completedFuture(Done.getInstance());
}
})
);
}
}
代码示例来源:origin: eclipse/ditto
.with(actorContext, log, enforcerExecutor);
return Flow.<WithSender>create()
.via(ActivityChecker.ofNullable(activityCheckInterval, actorContext.self()))
.via(PreEnforcer.fromFunction(actorContext.self(), preEnforcerFunction))
代码示例来源:origin: eclipse/ditto
/**
* Create Akka actor configuration Props object with pre-enforcer.
*
* @param configReader the configReader for the concierge service.
* @param pubSubMediator Akka pub-sub mediator.
* @param enforcerShardRegion shard region of enforcer actors.
* @param preEnforcer the pre-enforcer as graph.
* @return the Props object.
*/
public static Props props(final AbstractConciergeConfigReader configReader, final ActorRef pubSubMediator,
final ActorRef enforcerShardRegion,
final Graph<FlowShape<WithSender, WithSender>, NotUsed> preEnforcer) {
return GraphActor.partial(actorContext -> {
DispatcherActorCreator.initActor(actorContext.self(), pubSubMediator);
return Flow.<WithSender>create()
.via(preEnforcer)
.via(DispatcherActorCreator.dispatchGraph(actorContext, configReader, pubSubMediator,
enforcerShardRegion));
});
}
代码示例来源:origin: eclipse/ditto
/**
* Create a processing unit from a function.
*
* @param self reference to the actor carrying the pre-enforcement.
* @param processor function to call.
* @return Akka stream graph.
*/
static Graph<FlowShape<WithSender, WithSender>, NotUsed> fromFunction(
@Nullable final ActorRef self,
final Function<WithDittoHeaders, CompletionStage<WithDittoHeaders>> processor) {
final Attributes logLevels =
Attributes.createLogLevels(Logging.DebugLevel(), Logging.DebugLevel(), Logging.ErrorLevel());
final Flow<WithSender<WithDittoHeaders>, WithSender, NotUsed> flow =
Flow.<WithSender<WithDittoHeaders>>create()
.mapAsync(1, wrapped -> {
final Supplier<CompletionStage<Object>> futureSupplier = () ->
processor.apply(wrapped.getMessage())
.<Object>thenApply(result -> WithSender.of(result, wrapped.getSender()));
return handleErrorNowOrLater(futureSupplier, wrapped, self);
})
.log("PreEnforcer")
.withAttributes(logLevels)
.flatMapConcat(PreEnforcer::keepResultAndLogErrors);
return Pipe.joinUnhandledSink(
Pipe.joinFilteredFlow(Filter.of(WithDittoHeaders.class), flow), unhandled());
}
代码示例来源:origin: eclipse/ditto
private Sink<Message, NotUsed> createSink(final Integer version, final String connectionCorrelationId,
final AuthorizationContext connectionAuthContext, final DittoHeaders additionalHeaders,
final ProtocolAdapter adapter) {
return Flow.<Message>create()
.filter(Message::isText)
.map(Message::asTextMessage)
.map(textMsg -> {
if (textMsg.isStrict()) {
return Source.single(textMsg.getStrictText());
} else {
return textMsg.getStreamedText();
}
})
.flatMapConcat(textMsg -> textMsg.<String>fold("", (str1, str2) -> str1 + str2))
.via(Flow.fromFunction(result -> {
LogUtil.logWithCorrelationId(LOGGER, connectionCorrelationId, logger ->
logger.debug("Received incoming WebSocket message: {}", result));
return result;
}))
.withAttributes(Attributes.createLogLevels(Logging.DebugLevel(), Logging.DebugLevel(),
Logging.WarningLevel()))
.filter(strictText -> processProtocolMessage(connectionAuthContext, connectionCorrelationId,
strictText))
.map(buildSignal(version, connectionCorrelationId, connectionAuthContext, additionalHeaders, adapter))
.to(Sink.actorSubscriber(
CommandSubscriber.props(streamingActor, subscriberBackpressureQueueSize, eventStream)));
}
内容来源于网络,如有侵权,请联系作者删除!