akka Sping Boot 控制器返回单声道的scala.concurrent.Future

bfhwhh0e  于 2022-11-06  发布在  Scala
关注(0)|答案(1)|浏览(114)

我正在运行一个Akka演员系统里面的Sping Boot 应用程序。我有一组演员运行。
我从我的Controller类调用我的服务类,它使用Actor ask模式向一个参与者发送一条消息并期待一个响应。

public Mono<Future<SportEventDetailed>> getEventBySportAndLeagueId(Integer sportId, Integer leagueId) {
    final ActorSelection actorSelection = bootstrapAkka.getActorSystem().actorSelection("/user/some/path");
    final ActorMessage message = new ActorMessage()

    final CompletionStage<Future<SportEventDetails>> futureCompletionStage = actorSelection.resolveOne(Duration.ofSeconds(2))
            .thenApplyAsync(actorRef ->
                        Patterns.ask(actorRef, message, 1000)
                        .map(v1 -> (SportEventDetails) v1, ExecutionContext.global())
                )
                .whenCompleteAsync((sportEventDetailsFuture, throwable) -> {
                    // Here sportEventDetailsFuture is of type scala.concurrent.Future
                    sportEventDetailsFuture.onComplete(v1 -> {
                        final SportEventDetails eventDetails = v1.get();
                        log.info("Thread: {} | v1.get - onComplete - SED: {}", Thread.currentThread(), eventDetails);
                        return eventDetails;
                    }, ExecutionContext.global());
                });

    return Mono.fromCompletionStage(futureCompletionStage);
}

虽然控制器代码非常简单,

@GetMapping(path = "{sportId}/{leagueId}")
public Mono<Future<SportEventDetails>> getEventsBySportAndLeagueId(@PathVariable("sportId") Integer sportId, @PathVariable("leagueId") Integer leagueId) {
    return eventService.getEventBySportAndLeagueId(sportId, leagueId);
}

当客户端调用此终结点时,它将获得{"success":true,"failure":false}null(作为字符串)。
我怀疑null响应的问题是scala.concurrent.Future在响应发送到客户端之前没有完成-但我不明白为什么它没有按时完成,因为我假设Mono会等待将来完成
这里的问题是Patterns.ask返回一个scala.concurrent.Future<SportEventDetails>,我找不到一种方法将scala Future转换为Java CompletableFuture<SportEventDetails>CompletionStage<SportEventDetails>
所以,我的问题是:当使用Akka的Patterns.ask(...)模型时,我如何将SportEventDetails的json表示返回给客户端?

9rnv2umw

9rnv2umw1#

FutureMonoCompletionStage是同一概念的三个实现,一个值可能还没有出现。您需要一种方法将它们转换为相同的类型,然后需要一种方法将嵌套的类型“扁平化”。Mono.fromCompletionStage是这样一种方法,它将CompletionStage转换为Mono
最容易的是将Future和被避免的变平完全:
在较新的Java版本(2.5.19或更高版本)中:ask重载使用java.time.Duration超时,您将得到CompletionStage<SportEventDetail>的返回值。还有ask重载使用ActorSelection,这样您就不必先解析,然后在解析完成时询问:

CompletionStage<SportEventDetail> futureSportEventDetails = 
  Patterns.ask(selection, message, Duration.ofSeconds(3))
return Mono.fromCompletionStage(futureSportEventDetails);

在Akka的旧版本(我认为是2.4.2和更高版本)中,您应该能够在akka.pattern.PatternsCS中找到类似的签名。
如果您使用的是更旧的版本,并且无法升级,则可能必须提供自己的从Future<T>CompletionStage<T>Mono<T>的转换器方法,以便在将来注册onComplete侦听器并完成目标类型的示例。

相关问题