协程 await关键字 (三)
摘要:深入讲解一下await关键字的作用
await是python 最常见的关键字,那么 await 都干了什么?以下面的代码为例,尽可能分析一下....
代码的执行流程
代码如下:
import datetime
import asyncio
async def say_after_time(delay, what, name):
await asyncio.sleep(delay)
async def main(name):
print("开始时间为", datetime.datetime.now())
# 打印出当前正在运行的task
# 可以看到task的状态为 running
task = asyncio.current_task()
blocking = getattr(task, '_asyncio_future_blocking', None)
print("当前task是否阻塞", blocking, "当前task名字: ", name)
print("当前正在运行的task: ", task, "当前task名字: ", name)
# 打印出当前循环中的全部task
task = asyncio.all_tasks()
for othertask in task:
blocking = getattr(othertask, '_asyncio_future_blocking', None)
print("其他task是否阻塞", blocking)
print("全部task", task, "当前task名字: ", name)
await say_after_time(1, "hello", name)
await say_after_time(5, "world", name)
print("结束时间为:", datetime.datetime.now())
loop = asyncio.get_event_loop()
task1 = loop.create_task(main("task1"))
task2 = loop.create_task(main("task2"))
tasklist = [task1, task2]
loop.run_until_complete(asyncio.wait(tasklist))
loop.close()
'''
运行结果:
开始时间为 2020-12-25 20:41:40.040016
当前task是否阻塞 False 当前task名字: task1
当前正在运行的task: <Task pending coro=<main() running at E:/pycharm/daima/pycode/协程/asyncio_future_blocking和状态的关系.py:14>> 当前task名字: task1
其他task是否阻塞 False
其他task是否阻塞 False
其他task是否阻塞 False
全部task {<Task pending coro=<main() running at E:/pycharm/daima/pycode/协程/asyncio_future_blocking和状态的关系.py:6>>, <Task pending coro=<wait() running at E:\python\lib\asyncio\tasks.py:366> cb=[_run_until_complete_cb() at E:\python\lib\asyncio\base_events.py:153]>, <Task pending coro=<main() running at E:/pycharm/daima/pycode/协程/asyncio_future_blocking和状态的关系.py:21>>} 当前task名字: task1
开始时间为 2020-12-25 20:41:40.040016
当前task是否阻塞 False 当前task名字: task2
当前正在运行的task: <Task pending coro=<main() running at E:/pycharm/daima/pycode/协程/asyncio_future_blocking和状态的关系.py:14>> 当前task名字: task2
其他task是否阻塞 False
其他task是否阻塞 False
其他task是否阻塞 False
全部task {<Task pending coro=<main() running at E:/pycharm/daima/pycode/协程/asyncio_future_blocking和状态的关系.py:21>>, <Task pending coro=<wait() running at E:\python\lib\asyncio\tasks.py:366> cb=[_run_until_complete_cb() at E:\python\lib\asyncio\base_events.py:153]>, <Task pending coro=<main() running at E:/pycharm/daima/pycode/协程/asyncio_future_blocking和状态的关系.py:24> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x0000024DBAB30EB8>()]>>} 当前task名字: task2
结束时间为: 2020-12-25 20:41:46.040710
结束时间为: 2020-12-25 20:41:46.040710
'''
上面代码的执行顺序为:
第一步:
第二步:
第三步:
代码执行过程分析
首先 在老式的协程中是使用@asyncio.coroutine和yield from实现协程,在新式的协程中是使用async和await来实现协程。所以yield from 和 await 的作用应该是一样的,但是await 只能在 async 修饰的函数里面使用,yield from 就基本上哪都可以用,除了async修饰的函数里面。
其次我发现了一个有趣的现象:
由于task1是先执行,所以在task1执行时打印出task1的状态为:(ps: 数字14就代表 task1 下行 要执行的 代码的位置 是第14行**)
<Task pending coro=<main() running at E:/pycharm/daima/pycode/协程/asyncio_future_blocking和状态的关系.py:14>>
当执行task2的时候,此时task1处于await asyncio.sleep(1)造成的睡眠中。所以此时task1的状态为:
<Task pending coro=<main() running at E:/pycharm/daima/pycode/协程/asyncio_future_blocking和状态的关系.py:24> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x0000024DBAB30EB8>()]>>
注意一下 wait_for,发现 task1 任务在等待一个future对象。
先来分析一下 asyncio.sleep 的实现
来查看下 asyncio.sleep()源代码:
async def sleep(delay, result=None, *, loop=None):
"""Coroutine that completes after a given time (in seconds)."""
if delay <= 0:
await __sleep0()
return result
if loop is None:
loop = events.get_event_loop()
future = loop.create_future()
h = loop.call_later(delay,
futures._set_result_unless_cancelled,
future, result)
try:
return await future
finally:
h.cancel()
注意这段代码 return await future ,future变量是一个future类型的对象,所以查看 future类的源码
def __await__(self):
# self._asyncio_future_blocking 我没看明白
if not self.done():
self._asyncio_future_blocking = True
yield self # This tells Task to wait for completion.
if not self.done():
raise RuntimeError("await wasn't used with future")
return self.result() # May raise too.
__iter__ = __await__ # make compatible with 'yield from'.
有点意思了,注意 make compatible with 'yield from.',前面还刚刚讲到 await 和yield from 就是 新式协程 和 旧式协程 的关系。再联想到 yield from 和 __iter__ 的关系(ps: 不懂去看 为什么for循环可以遍历list:Python中迭代器与生成器 - 奥辰 - 博客园),没有理由不怀疑 await 的作用就是调用 future对象内部的 __await__ 方法。
这个时候又联想到了前面提到的 awaitable对象 。
awaitable对象
什么是awaitable对象——即可暂停等待的对象
有三类对象是可等待的,即 coroutines, Tasks, and Futures。
coroutine:本质上就是一个函数,一前面的生成器yield和yield from为基础,不再赘述;
Tasks: 任务,顾名思义,就是要完成某件事情,其实就是对协程函数进一步的封装;
Future:它是一个“更底层”的概念,他代表一个一步操作的最终结果,因为一步操作一般用于耗时操作,结果不会立即得到,会在“将来”得到异步运行的结果,故而命名为Future。
三者的关系,coroutine可以自动封装成task,而Task是Future的子类。
既然 coroutines, Tasks, and Futures 都是可暂停等待的对象, 于是构造一个协程对象 ,查看对象内部有没有 __await__ 方法,
import asyncio
async def say_after_time(delay, what, name):
await asyncio.sleep(delay)
a = say_after_time(1, "hello", "world")
print(dir(a))
'''
运行结果:
['__await__', '__class__', '__del__', '__delattr__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__name__', '__ne__', '__new__', '__qualname__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', 'close', 'cr_await', 'cr_code', 'cr_frame', 'cr_origin', 'cr_running', 'send', 'throw']
'''
哦豁,协程对象果然有__await__方法,那么可以推论 当使用 async和await 构造的 say_after_time 协程函数 构造协程对象时,对象内部会被添加__await__方法。 ,当代码执行到 await say_after_time(1, "hello", name)时 ,await关键字 会调用 say_after_time(1, "hello", name)协程对象 里面的 __await__ 方法。
还有一个问题 task1 的状态 是
<Task pending coro=<main() running at E:/pycharm/daima/pycode/协程/asyncio_future_blocking和状态的关系.py:24> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x0000024DBAB30EB8>()]>>
可以看到 task1 wait_for(等待) 一个 future对象 。这个future 对象又是什么怎么来的?其实就是从asyncio.sleep()的下面这条件语句得来的。
try:
return await future
finally:
h.cancel()
至于 该future对象里面的 cb=[<TaskWakeupMethWrapper object at 0x0000024DBAB30EB8>()],cb就是callback回调函数,当future的状态为finished的时候,会调用其回调函数,然后future的回调函数就是 task1 的 __wakeup函数(ps:下面有更详细的解释,往下看)。
关于 _asyncio_future_blocking 当值为true 时表示这个future对象还没有执行完毕。但也不算恰当。毕竟创建一个为pending的future对象, _asyncio_future_blocking 属性 默认就是False,而按照上面的推论,false应该是执行完毕的。所以我也搞不懂是干嘛的。
在 futures.py中找到的提示
# This field is used for a dual purpose:
# - Its presence is a marker to declare that a class implements
# the Future protocol (i.e. is intended to be duck-type compatible).
# The value must also be not-None, to enable a subclass to declare
# that it is not compatible by setting this to None.
# - It is set by __iter__() below so that Task._step() can tell
# the difference between
# `await Future()` or`yield from Future()` (correct) vs.
# `yield Future()` (incorrect).
对 await 执行过程分析
既然 await 这个关键字 分析完了,接下来来讲点更详细的细节,建议先看一下
python协程的原理。
代码依次执行:
loop = asyncio.get_event_loop()
task1 = loop.create_task(main("task1"))
task2 = loop.create_task(main("task2"))
tasklist = [task1, task2]
loop.run_until_complete(asyncio.wait(tasklist))
故事从loop.run_until_complete(asyncio.wait(tasklist))开始
asyncio.wait(tasklist) 是一个 协程函数,然后 loop.run_until_complete 在内部将这个协程函数封装为future,然后将_run_until_complete_cb()设置为这个future的回调函数。
<Task pending coro=<wait() running at E:\python\lib\asyncio\tasks.py:366> cb=[_run_until_complete_cb() at E:\python\lib\asyncio\base_events.py:153]>
然后 在 asyncio.wait 函数内部 调用 task1, task1在内部调用 await say_after_time(1, "hello", name),这个时候进入 say_after_time函数内部,然后遇到了 await asyncio.sleep(delay),然后 进入 asyncio.sleep 函数内部执行,然后由于 asyncio.sleep 在内部返回一个 future,这个时候将 task1 的 __wakeup函数 加入到 future 的 self._callbacks = [] 中。由于loop.call_later设置的有睡眠时间,当时间到达,loop循环 调用 future对象的set_result 函数时,future对象被设置为finished状态,然后调用 self._callbacks 中的 __wakeup 函数,接着执行task1。
当task1和task2都执行完毕后,asyncio.wait(tasklist)封装的future 的状态变为 finished,然后调用future的回调函数_run_until_complete_cb()。这个函数会将loop._stopping 设置为 True,这个时候loop循环就会停止运行。
对于task1 的 __wakeup函数 加入到 future的 解释
task1 的 __wakeup函数 加入到 future 的 self._callbacks = [] 的简化代码:
# tasks.py
coro = self._coro
result = coro.send(None)
blocking = getattr(result, '_asyncio_future_blocking', None)
# blocking 不为 None,说明result是一个future对象或者task对象
if blocking is not None:
# Yielded Future must come from Future.__iter__().
# 返回的future必须和当前的task在同一个loop循环中
if result._loop is not self._loop:
self._loop.call_soon(
self._step,
RuntimeError(
'Task {!r} got Future {!r} attached to a '
'different loop'.format(self, result)))
elif blocking:
# task 不能 await 自己,无限套娃
if result is self:
self._loop.call_soon(
self._step,
RuntimeError(
'Task cannot await on itself: {!r}'.format(
self)))
else:
# 将当前task的self._wakeup,加入future对象的回调函数
result._asyncio_future_blocking = False
result.add_done_callback(self._wakeup)
self._fut_waiter = result
if self._must_cancel:
if self._fut_waiter.cancel():
self._must_cancel = False
loop.run_until_complete 的执行原理
loop.run_until_complete 内部调用 loop.run_forever,不同的是 loop.run_until_complete(asyncio.wait(tasklist)) 会给 asyncio.wait(tasklist)封装成的future 添加 _run_until_complete_cb 回调函数,当 封装成的future 执行完毕, _run_until_complete_cb回调函数 将 loop._stopping 设置为 True,这个时候loop循环就会停止运行。
_run_until_complete_cb()的函数代码:
# base_events.py def _run_until_complete_cb(fut): if not fut.cancelled(): exc = fut.exception() if isinstance(exc, BaseException) and not isinstance(exc, Exception): # Issue #22429: run_forever() already finished, no need to # stop it. return futures._get_loop(fut).stop()
futures._get_loop(fut).stop()的函数代码:
# # base_events.py def stop(self): """Stop running the event loop. Every callback already scheduled will still run. This simply informs run_forever to stop looping after a complete iteration. """ self._stopping = True
本作品采用 知识共享署名-相同方式共享 4.0 国际许可协议 进行许可。