scala 如何正确中断zio测试中的纤维?

cedebl8k  于 5个月前  发布在  Scala
关注(0)|答案(1)|浏览(83)

我试图为一个Web服务编写一些测试,为测试分叉服务,击中一些端点,然后杀死它,但无法找到一种真正有效的方法:(
这里是我写的一个“模拟”服务,只是为了演示这个问题。它不是服务web请求,而是从输入队列读取字符串,并将它们复制到输出。同时也是一个客户端的服务器,保留对两个队列的引用,并提供一个ask方法,模拟客户端端点调用:

object EchoService {
  val nextId = new AtomicInteger(0)
  val manager = ZLayer.fromZIO(
    for {
      in <- Queue.bounded[String](10)
      out <- Queue.bounded[(Int, String)](10)
    } yield Mgr(in, out)
  )

  val forked = ZLayer.fromZIO(
    ZIO.acquireRelease(
      for {
        mgr <- ZIO.service[Mgr]
        f <- mgr.fork
      } yield f
    )(_.interrupt).withClock(ClockLive)
  )

  def ask(s: String) =
    ZIO.service[Mgr].flatMap(_.ask(s)).withClock(ClockLive)

  case class Mgr(in: Queue[String], out: Queue[(Int, String)]) {
    def ask(s: String) = in.offer(s) *> out.take.timeout(1.second)
    def fork = Svc(nextId.updateAndGet(_ + 1), in, out).serve.fork
  }

  case class Svc(id: Int, in: Queue[String], out: Queue[(Int, String)]) {
    val serve = ZStream.fromQueue(in)
      .takeWhile(_.nonEmpty)
      .timeout(5.seconds)
      .foreach(s => out.offer(id -> s))
      .withClock(ClockLive)
  }
}

字符串
这就是我试图测试它的方式:

def spec = suite("Tests")(
    suite("Forked service")(
      test("echo") { EchoService.ask("foo").map(x => assertTrue(x == Some(1, "foo"))).withClock(ClockLive) }
    ).provideSomeLayerShared(EchoService.manager >+> EchoService.forked)


这将永远挂起:((我所做的一些调试表明,.interrupt被调用,但永远不会返回)。
测试成功完成,然后它就挂起了。日志中有两条消息,我都不明白:
1.
警告:测试正在使用时间,但没有提前测试时钟,这可能导致测试挂起。使用TestClock.adjust手动提前时间。
我不明白这是在哪里使用TestClock。我把.withClock(ClockLive)放在了我能放的地方。还有什么遗漏的?有没有一种方法可以在整个规范中不使用TestClock,而不是在任何地方显式地覆盖它?
2.
警告:ZIO Test正在尝试关闭test_case_-665442250中的套件Tests - Forked服务的作用域,但关闭作用域的时间超过了60秒。这可能表明存在资源泄漏。
好吧,那么问题是为什么会发生这种情况。.interrupt不仅仅是杀死纤程,而是表现得更像java的中断,只是礼貌地要求代码退出?如果是这样,有没有其他方法可以用来无条件地停止执行纤程?
有趣的是,如果我使用provideSomeLayer而不是provideSomeLayerShared,行为发生了变化:它仍然挂起,但方式不同。测试没有完成(似乎甚至没有开始),日志中没有任何内容,除了以下内容:
测试测试-分叉服务-执行回显的时间超过1 m。如果这不是预期的,请考虑使用TestAspect.timeout来超时失控测试,以加快诊断速度。
在本例中,由于某种原因,它试图在开始测试之前关闭作用域(并且仍然挂在同一个地方试图调用interrupt)。

yqlxgs2m

yqlxgs2m1#

这是一个有趣的问题,因为它突出了几个问题的阴谋在一起。
要回答与在环境中使用实时时钟相关的第一个问题。为此,您可以在测试级别或套件级别使用TestAspect.withLiveClock

val spec = suite("MySpec")(
  // applies just to this test
  test("test1") { assertCompletes } @@ TestAspect.withLiveClock     
) @@ TestAspect.withLiveClock // applies to every test in the suite

字符串
您遇到的第二个问题与此层有关(根据上述答案删除了额外的时钟分配):

val forked = ZLayer.fromZIO(
  ZIO.acquireRelease(
    for {
      mgr <- ZIO.service[Mgr]
      f <- mgr.fork
    } yield f
  )(_.interrupt)
)


这里的问题是 interruptibilityscoping 的问题。在ZIO中,所有区域都有一个interruptibility状态,它指示该区域是否可以被主动中断,如果不能,那么运行时将在语义上阻塞,直到执行达到允许中断的点。
默认情况下,纤程总是可中断的,然而,在层中或在作用域的获取或发布阶段,可中断性被设置为不可中断。此外,可中断性状态由子纤程继承,这意味着在这些区域内产生的任何东西都将 * 也 * 具有该行为,除非以某种方式被覆盖。
这意味着

for {
  mgr <- ZIO.service[Mgr]
  f   <- mgr.fork 
} yield f


不间断地执行,因为它位于采集块中,并且同样地,底层流也不间断地执行。
在继续之前,这里还有一个问题,那就是作用域行为。ZIO.acquireRelease返回一个ZIO[Scope with R, E, A],但是,fork s默认绑定到它们的父作用域。这意味着通过在acquireRelease中分叉,分叉实际上只是绑定到获取,所以你从返回的理解它已经试图中断fork(它不能,因为它是不可中断的)。这也是传递ZIO的作用域到层之外,这是一个反模式。
相反,我建议做这样的事情:

val forked = ZLayer.scoped(ZIO.serviceWithZIO[Mgr](_.forkScoped))


forkScoped操作符返回一个其生命周期与作用域相连的纤程,然后ZLayer.scoped方法将该作用域与Layer的生命周期相连。
解决这个问题仍然会出现上面提到的试图中断不可中断区域的问题。处理这个问题取决于我们在该区域内到底做了什么,以及我们可以在哪里安全地注入中断。
我发现以下两种方式可以工作:
1.将uninterruptible添加到serve方法.foreach(s => out.offer(id -> s)).interruptible中,但是您需要小心使用这种方法,因为它可能会在ZIO的其余部分的可中断性中“戳出漏洞”。
1.向Mgr层添加终结器,关闭队列,并允许流在层释放时自然结束。

val manager = ZLayer.scoped(
  for {
    in  <- Queue.bounded[String](10)
    out <- Queue.bounded[(Int, String)](10)
    _   <- ZIO.addFinalizer(in.shutdown *> out.shutdown)
  } yield Mgr(in, out)
)


完整的代码示例:

object EchoSpec extends ZIOSpecDefault {
  val spec = suite("Echo")(
    test("mgr test") {
      EchoService.ask("hello").map(x => assertTrue(x.isDefined))
    }.provide(
      EchoService.manager,
      EchoService.forked
    ) @@ TestAspect.withLiveClock @@ TestAspect.timeout(10.seconds)
  )

  object EchoService {
    val nextId = new AtomicInteger(0)
    val manager = ZLayer.scoped(
      for {
        in  <- Queue.bounded[String](10)
        out <- Queue.bounded[(Int, String)](10)
        _   <- ZIO.addFinalizer(in.shutdown *> out.shutdown)
      } yield Mgr(in, out)
    )

    val forked = ZLayer.scoped(ZIO.serviceWithZIO[Mgr](_.fork)).unit

    def ask(s: String) =
      ZIO.serviceWithZIO[Mgr](_.ask(s))

    case class Mgr(in: Queue[String], out: Queue[(Int, String)]) {
      def ask(s: String) = in.offer(s) *> out.take.timeout(1.second)
      def fork           = Svc(nextId.updateAndGet(_ + 1), in, out).serve.forkScoped
    }

    case class Svc(id: Int, in: Queue[String], out: Queue[(Int, String)]) {
      val serve = ZStream
        .fromQueue(in)
        .takeWhile(_.nonEmpty)
        .timeout(5.seconds)
        .foreach(s => out.offer(id -> s))
    }
  }
}

相关问题