我试图为一个Web服务编写一些测试,为测试分叉服务,击中一些端点,然后杀死它,但无法找到一种真正有效的方法:(
这里是我写的一个“模拟”服务,只是为了演示这个问题。它不是服务web请求,而是从输入队列读取字符串,并将它们复制到输出。同时也是一个客户端的服务器,保留对两个队列的引用,并提供一个ask方法,模拟客户端端点调用:
object EchoService {
val nextId = new AtomicInteger(0)
val manager = ZLayer.fromZIO(
for {
in <- Queue.bounded[String](10)
out <- Queue.bounded[(Int, String)](10)
} yield Mgr(in, out)
)
val forked = ZLayer.fromZIO(
ZIO.acquireRelease(
for {
mgr <- ZIO.service[Mgr]
f <- mgr.fork
} yield f
)(_.interrupt).withClock(ClockLive)
)
def ask(s: String) =
ZIO.service[Mgr].flatMap(_.ask(s)).withClock(ClockLive)
case class Mgr(in: Queue[String], out: Queue[(Int, String)]) {
def ask(s: String) = in.offer(s) *> out.take.timeout(1.second)
def fork = Svc(nextId.updateAndGet(_ + 1), in, out).serve.fork
}
case class Svc(id: Int, in: Queue[String], out: Queue[(Int, String)]) {
val serve = ZStream.fromQueue(in)
.takeWhile(_.nonEmpty)
.timeout(5.seconds)
.foreach(s => out.offer(id -> s))
.withClock(ClockLive)
}
}
字符串
这就是我试图测试它的方式:
def spec = suite("Tests")(
suite("Forked service")(
test("echo") { EchoService.ask("foo").map(x => assertTrue(x == Some(1, "foo"))).withClock(ClockLive) }
).provideSomeLayerShared(EchoService.manager >+> EchoService.forked)
型
这将永远挂起:((我所做的一些调试表明,.interrupt被调用,但永远不会返回)。
测试成功完成,然后它就挂起了。日志中有两条消息,我都不明白:
1.
警告:测试正在使用时间,但没有提前测试时钟,这可能导致测试挂起。使用TestClock.adjust手动提前时间。
我不明白这是在哪里使用TestClock。我把.withClock(ClockLive)放在了我能放的地方。还有什么遗漏的?有没有一种方法可以在整个规范中不使用TestClock,而不是在任何地方显式地覆盖它?
2.
警告:ZIO Test正在尝试关闭test_case_-665442250中的套件Tests - Forked服务的作用域,但关闭作用域的时间超过了60秒。这可能表明存在资源泄漏。
好吧,那么问题是为什么会发生这种情况。.interrupt不仅仅是杀死纤程,而是表现得更像java的中断,只是礼貌地要求代码退出?如果是这样,有没有其他方法可以用来无条件地停止执行纤程?
有趣的是,如果我使用provideSomeLayer
而不是provideSomeLayerShared
,行为发生了变化:它仍然挂起,但方式不同。测试没有完成(似乎甚至没有开始),日志中没有任何内容,除了以下内容:
测试测试-分叉服务-执行回显的时间超过1 m。如果这不是预期的,请考虑使用TestAspect.timeout来超时失控测试,以加快诊断速度。
在本例中,由于某种原因,它试图在开始测试之前关闭作用域(并且仍然挂在同一个地方试图调用interrupt
)。
1条答案
按热度按时间yqlxgs2m1#
这是一个有趣的问题,因为它突出了几个问题的阴谋在一起。
要回答与在环境中使用实时时钟相关的第一个问题。为此,您可以在测试级别或套件级别使用
TestAspect.withLiveClock
:字符串
您遇到的第二个问题与此层有关(根据上述答案删除了额外的时钟分配):
型
这里的问题是 interruptibility 和 scoping 的问题。在
ZIO
中,所有区域都有一个interruptibility状态,它指示该区域是否可以被主动中断,如果不能,那么运行时将在语义上阻塞,直到执行达到允许中断的点。默认情况下,纤程总是可中断的,然而,在层中或在作用域的获取或发布阶段,可中断性被设置为不可中断。此外,可中断性状态由子纤程继承,这意味着在这些区域内产生的任何东西都将 * 也 * 具有该行为,除非以某种方式被覆盖。
这意味着
型
不间断地执行,因为它位于采集块中,并且同样地,底层流也不间断地执行。
在继续之前,这里还有一个问题,那就是作用域行为。
ZIO.acquireRelease
返回一个ZIO[Scope with R, E, A]
,但是,fork
s默认绑定到它们的父作用域。这意味着通过在acquireRelease
中分叉,分叉实际上只是绑定到获取,所以你从返回的理解它已经试图中断fork
(它不能,因为它是不可中断的)。这也是传递ZIO的作用域到层之外,这是一个反模式。相反,我建议做这样的事情:
型
forkScoped
操作符返回一个其生命周期与作用域相连的纤程,然后ZLayer.scoped
方法将该作用域与Layer
的生命周期相连。解决这个问题仍然会出现上面提到的试图中断不可中断区域的问题。处理这个问题取决于我们在该区域内到底做了什么,以及我们可以在哪里安全地注入中断。
我发现以下两种方式可以工作:
1.将
uninterruptible
添加到serve
方法.foreach(s => out.offer(id -> s)).interruptible
中,但是您需要小心使用这种方法,因为它可能会在ZIO的其余部分的可中断性中“戳出漏洞”。1.向
Mgr
层添加终结器,关闭队列,并允许流在层释放时自然结束。型
完整的代码示例:
型