在Akka Typed中聚合多个子参与者响应

jjhzyzn0  于 2022-11-23  发布在  其他
关注(0)|答案(1)|浏览(123)

我目前正在将一个Akka Classic应用程序移植到Akka Typed。我有以下组件:

  • HttpService -不是执行元
  • JobDispatcher -执行者
  • JobWorker -JobDispatcher的子执行元

JobDispatcher是编排作业的单例执行元。每个JobWorker负责一个“作业”,并知道该作业的状态。
HTTP服务将向JobDispatcher发出一个名为GetJobStatuses的询问。然后,JobDispatcher将询问每个JobWorkers的状态,将结果聚合到一个列表中,并回复HttpService。
我在Akka Classic中的做法是让JobDispatcher执行所有的请求,将Futures放入Futures列表中,然后将其转换为Future of Lists,当聚合Future完成后,我将结果发送到HttpService。

val statusFutures: Seq[Future[JobStatus]] = jobWorkers map (jobWorker => (jobWorker ? GetJobStatus).mapTo[JobStatus])
  val aggregateFuture: Future[Seq[SearchStatus]] = Future.sequence(statusFutures)

  val theSender = context.sender()
  aggregateFuture onComplete {
    case Success(jobStatuses: Seq[JobStatus]) => {
      theSender ! jobStatuses
    }
    case Failure(exception) => {
      theSender ! exception
    }
  }

所以,现在我们要转到Akka Typed,我们不应该使用Futures / onComplete,而是将Ask响应转换为返回给我们自己的消息(在本例中为JobDispatcher)。这对于我向另一个参与者请求一个响应的简单情况是相当简单的。但在本例中,我有一个完整的子参与者列表,我需要从中编译他们的响应。
我能想到的唯一方法是让JobDispatcher保存我正在等待的JobWorker响应列表的“状态,”跟踪哪些响应已经收到,当我收到所有响应时,将响应消息发送回HTTP服务。并且以某种方式标识每个状态对应的HTTP请求。
这比上面的聚合未来解决方案要复杂得多。
在Akka Typed中,处理这种情况的简单/正确方法是什么?

jckbn6z7

jckbn6z71#

文档建议在这种情况下使用每个会话的子执行元。子执行元只与一个HTTP请求相关联,它隐式地跟踪该状态的一个副本,并且还能够管理分散/收集作业的进程状态(例如,在超时和重试附近)。
同样值得注意的是,示例经典代码有一个巨大的bug:不要在涉及future的代码中调用sender。混合future和actors表面上看起来很容易,但也很容易变成只有在巧合下才能工作的东西(测试经常表现出这种巧合的行为)。

相关问题