我使用RestartSource.onFailuresWithBackoff在出现异常时重新启动Source,但如果收到特定的异常类型,我希望停止(取消)重新启动。例如:
RestartSource
.onFailuresWithBackoff(
minBackoff = 1,
maxBackoff = 5,
randomFactor = 0.2,
maxRestarts = 3
) { () =>
val responseFuture = doSomeAsyncTask().recover {
case SomeSpecialError =>
// I want to quit from the restarts
case NonFatal(ex) =>
// Re-throw so that the Source is restarted
throw ex
}
Source
.future(responseFuture)
.mapAsync(parallelism = 1)(Future.successful(_))
}
字符串
我尝试在 Package 的Source和RestartSource上设置一个Supervision策略,但事件从未到达它。出于这个原因,同样的解释也适用于尝试在Sink操作符上这样做。
2条答案
按热度按时间70gysomp1#
在
RestartSource.onFailuresWithBackoff
文档中,您所需要的只是完成源代码(不发出任何内容),以防止重新启动。一种实现方法是,如果
doSomeAsyncTask
导致Future[T]
,则将其Map到Future[Option[T]]
,然后将可区分的失败恢复为成功的None
。然后在流源中:举例来说:
字符串
上面假设
doSomeAsyncTask()
永远不会导致成功的null
,但是由于null
不应该通过Akka Stream传递,并且您没有处理它,这可能是一个合理安全的假设。rkttyhzu2#
我认为正确的处理方法是使用
RestartSettings
:字符串