scala Akka Streams RestartSource.onFailuresWithBackoff停止条件

0dxa2lsx  于 6个月前  发布在  Scala
关注(0)|答案(2)|浏览(60)

我使用RestartSource.onFailuresWithBackoff在出现异常时重新启动Source,但如果收到特定的异常类型,我希望停止(取消)重新启动。例如:

RestartSource
  .onFailuresWithBackoff(
    minBackoff = 1,
    maxBackoff = 5,
    randomFactor = 0.2,
    maxRestarts = 3
  ) { () => 
    val responseFuture = doSomeAsyncTask().recover {
      case SomeSpecialError =>
        // I want to quit from the restarts
      case NonFatal(ex) => 
        // Re-throw so that the Source is restarted
        throw ex
    }

    Source
      .future(responseFuture)
      .mapAsync(parallelism = 1)(Future.successful(_))
}

字符串
我尝试在 Package 的Source和RestartSource上设置一个Supervision策略,但事件从未到达它。出于这个原因,同样的解释也适用于尝试在Sink操作符上这样做。

70gysomp

70gysomp1#

RestartSource.onFailuresWithBackoff文档中,您所需要的只是完成源代码(不发出任何内容),以防止重新启动。
一种实现方法是,如果doSomeAsyncTask导致Future[T],则将其Map到Future[Option[T]],然后将可区分的失败恢复为成功的None。然后在流源中:

  • 如果原始的future由于其他异常而失败,则源将失败并重新启动
  • 如果最初的future由于可分辨异常而失败,我们将其过滤掉,这样源代码就完成了,而不发出任何东西。
  • 如果最初的future成功了,我们就正常地传递该值

举例来说:

RestartSource.onFailuresWithBackoff(
  // yada yada yada
) { () =>
  val baseFuture = doSomeAsyncTask().map(Option(_))
  val tweakedFuture = baseFuture.recoverWith {
    case SomeSpecialError => Future.successful(None)
    case NonFatal(e) => baseFuture  // including for clarity
  }

  Source.future(tweakedFuture)
    .mapConcat(_.toList)  // swallows the None arising from `SomeSpecialError`
    // the mapAsync in your question is pointless, so I've omitted it,
    // but if it's a placeholder for something else, you'd put it here
}

字符串
上面假设doSomeAsyncTask()永远不会导致成功的null,但是由于null不应该通过Akka Stream传递,并且您没有处理它,这可能是一个合理安全的假设。

rkttyhzu

rkttyhzu2#

我认为正确的处理方法是使用RestartSettings

private val restartSettings = RestartSettings(
  minBackoff = 3.seconds,
  maxBackoff = 30.seconds,
  randomFactor = 0.2
)
  .withMaxRestarts(3, 1.hour)  // should always reach the 3 counts within 1 hour
  .withRestartOn {
    case sse: SomeSpecialError => 
      log error ("Didn't restart because of SomeSpecialError", sse)
      false  // don't restart
    case _ => 
      true  // restart
  }

RestartSource.onFailuresWithBackoff(restartSettings) { () =>
    Source
      .future(responseFuture)
      .mapAsync(parallelism = 1)(Future.successful(_))
}

字符串

相关问题