python-3.x 为什么Asyncio的事件循环比预期的迭代次数多?

j7dteeu8  于 4个月前  发布在  Python
关注(0)|答案(1)|浏览(92)

我试图理解asyncio事件循环如何知道它必须进行“N”次迭代。因此,首先我寻找一种方法来确定它的迭代次数。为此,我下载了Cpython源代码。(V3.11.4),我为BaseEventLoop类添加了一个新属性,名为self._loop_iterations,并将其设置为0,然后在def _run_once(self)中将其递增1,大概是这样的

#cpython/Lib/asyncio/base_events.py

class BaseEventLoop(events.AbstractEventLoop):

    def __init__(self):
        ....
        # loop iterations
        self._loop_iterations = 0

    def _run_once(self):
        """Run one full iteration of the event loop.

        This calls all currently ready callbacks, polls for I/O,
        schedules the resulting callbacks, and finally schedules
        'call_later' callbacks.
        """
        self._loop_iterations += 1
        ...

字符串
编译后,我使用更新的cpython版本运行上面的代码,打印出loop iteration: 4

#loop_iterations.py

import asyncio

async def coro(msg):
    print(">>> running coro <<<")
    await asyncio.sleep(2)
    print(">>> finished coro <<<")
    return msg

loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(coro("Andres"))
print("loop iterations: ", loop._loop_iterations)


为什么前面的代码打印4?
根据我的理解,它应该打印2次迭代,因为它会做这样的事情:

  • 第一次迭代:运行coro,直到它遇到await语句。
  • 第二次迭代:当coro在休眠2秒之后醒来时,事件循环再次迭代,并且它恢复科罗,并且它设置其返回值msg,然后停止。

我的理解有什么问题吗?我是否遗漏了事件循环的一个内部过程?

alen0pnh

alen0pnh1#

所以,真实的事情是,BRAC循环的实际迭代次数并不重要,它是一个实现细节。
甚至你修改的_run_once方法也被标记为private,因为它是一个实现细节。在这种情况下Assert的是任务将运行到完成。
Python默认循环的实现本身就很复杂--可能它不能变得更简单了,简单的事情,比如“预热”和“确保没有未决”,可以解释额外的迭代。
测试和检查你正在做的事情,以更好地了解他们的内部工作是可以的:例如,您可以扩展您的测试,以检查是否总是只有“2个额外的迭代”超出您预期的迭代,或者如果这个数字以某种方式随着任务的数量而增长--但始终记住这个数字只是一个实现细节,不应该影响“真实的”的结果。代码。请记住,作为一个实现细节,这可能会在一夜之间发生变化,即使是在Python的微版本中。(就像方法_run_once本身甚至可能不再存在一样-任何代码都不应该依赖于它)
Asyncio很容易被“以错误的方式停止”,挂起的任务尚未执行。研究最佳实践,使您的代码永远不会福尔斯这种情况是一件好事。自Python 3.11以来,asyncio.TaskGroup已经添加,以帮助编码模式更容易实现。
所有这些都说明,回到您的问题-run_until_complete使用的 * 停止 * 循环的机制(并且,通过asyncio.run),是向将来传递的添加回调,以便在完成时,调用当前循环.stop()方法。这将设置内部._stopping属性,当在run_forever上选中时,方法停止以重复的方式调用_run_once
Python编程器.base_events.py:

...

def _run_until_complete_cb(fut):
    if not fut.cancelled():
        ...
    futures._get_loop(fut).stop()

...

class BaseEventLoop(...):
    
    ...

    def run_forever(self):
        """Run until stop() is called."""
        try:
            self._run_forever_setup()
            while True:
                self._run_once()
                if self._stopping:
                    break
        finally:
            self._run_forever_cleanup()
    ...

    def run_until_complete(self, future):
        """Run until the Future is done.

        ...
        """
        ...
        future.add_done_callback(_run_until_complete_cb)
        try:
            self.run_forever()    # <- this is just stopped because the `done callback` above does so.
        except:
            ...
        
        return future.result()
    
    ...
    def stop(self):
        """Stop running the event loop.

        Every callback already scheduled will still run.  This simply informs
        run_forever to stop looping after a complete iteration.
        """
        self._stopping = True

字符串
如果将来的回调是同步运行的:也就是说,就像在同步代码中发生的普通函数一样调用(例如,在plain、sync、Python中,当对象丢失其最后一个引用时,对象的__del__方法被同步调用),这意味着在主future没有完成之后,_run_once不会再次被调用。但是正如你在asyncio/futures.py文件中看到的,在事件循环的下一次传递 * 上调度和调用未来的done回调:只有这样才会调用loop.stop():因此至少有一次_run_once的额外传递是由此引起的。
Python asyncio.futures.py:

... 
    class Future:

    ...

    def __schedule_callbacks(self):
        """Internal: Ask the event loop to call all callbacks.

        The callbacks are scheduled to be called as soon as possible. Also
        clears the callback list.
        """
        callbacks = self._callbacks[:]
        if not callbacks:
            return

        self._callbacks[:] = []
        for callback, ctx in callbacks:
            self._loop.call_soon(callback, self, context=ctx)


如果另一个额外的通道是由于其他延迟,或者在run_forever设置中发生的事情,你可以检查-但它既与这些机制有关,也与实际的JavaScript使用代码无关,正如我上面所说的。

相关问题