协程 task and future(二)
摘要:记录一下关于协程中task and future常见的用法以及概念。
关于协程的概念
首先,在单核不开启超线程的前提下,在一段时间片内,cpu只能执行一个线程。而协程的关键就在于在这个线程中实现了一个loop循环,然后把task任务或者future一个一个加进去,然后遍历这个loop循环,当一个事件发生(比如使用loop.call_later设置几秒后执行这个task)时,loop就去执行这个task。所以协程对密集计算型是没啥用的,就一个时间片,你咋用都是这么多,对爬虫之类的IO密集型(因为不需要等待比如 http请求,你需要等待response,而response的时间是不一定的) 会提升较多。密集计算型应该使用多进程,因为cpu时间片的划分是以进程为单位的,所以多进程会占用更多的cpu时间片。
对于await关键词,await关键词。异步io的关键在于 await io操作,此时,当前携程就会被挂起,时间循环转而执行其他携程,但是要注意前面这句话,并不是说所有携程里的await都会导致当前携程的挂起,要看await后面跟的是什么,如果跟的是我们定义的携程,则会执行这个携程,如果是asyncio模块制作者定义的固有携程,比如模拟io操作的asyncio.sleep,以及io操作,比如网络io:asyncio.open_connection这些,才会挂起当前携程。参考自:await这个关键词以及asyncio.wait asyncio.gather - 扫驴 - 博客园。
await关键字和yield from关键字
其实await和yield from关键字没什么用,至于前面说的 当前携程就会被挂起,时间循环转而执行其他携程。简单来说,await io操作 时,程序会跳转到io操作,然后io资源准备好需要时间吧,正常情况下程序会阻塞到这一行代码(而异步模块比如 aiohtpp 之类的 这个模块是精心设计的,作者会采用非阻塞的模式,io没准备好没有关系,继续往下执行io操作包含的代码,准备好后麻烦操作系统通知我一下)。所以如果你在await io操作 的 io操作 中使用time.sleep(5)之类的该阻塞还是会阻塞的,时间循环就不会转而执行其他携程。
await io操作 其实就是遇到比如请求IO资源时设置一个回调函数,然后当IO资源准备好后之后,系统会给loop循环发送通知表示IO资源已经准备好,然后loop循环调用回调函数,而回调函数内部包含对IO资源的处理,就这样,这就是协程的大概原理。
awaitable对象
什么是awaitable对象——即可暂停等待的对象
有三类对象是可等待的,即 coroutines, Tasks, and Futures。
coroutine:本质上就是一个函数,一前面的生成器yield和yield from为基础,不再赘述;
Tasks: 任务,顾名思义,就是要完成某件事情,其实就是对协程函数进一步的封装;
Future:它是一个“更底层”的概念,他代表一个一步操作的最终结果,因为一步操作一般用于耗时操作,结果不会立即得到,会在“将来”得到异步运行的结果,故而命名为Future。
三者的关系,coroutine可以自动封装成task,而Task是Future的子类。
future对象
pycharm中使用
import asyncio.tasks
import asyncio.futures
然后按CTRL同时鼠标点击futures即可查看futures类的定义
关于future的状态
future对象有几个状态:
- Pending
- Running
- Done
- Cancelled
创建future的时候,task为pending,事件循环调用执行的时候当然就是running,调用完毕自然就是done,如果需要停止事件循环,就需要先把task取消,状态为cancel。
相关使用:
import asyncio
def foo(future, result):
print(f"此时future的状态:{future}")
print(f"设置future的结果:{result}")
future.set_result(result)
print(f"此时future的状态:{future}")
if __name__ == '__main__':
loop = asyncio.get_event_loop()
try:
all_done = asyncio.Future()
# all_done,"Future is done"为参数
# 函数原型: loop.call_soon(callback, *args, context=None)
loop.call_soon(foo, all_done, "Future is done!")
print("进入事件循环")
result = loop.run_until_complete(all_done)
print("返回结果", result)
finally:
print("关闭事件循环")
loop.close()
print("获取future的结果", all_done.result())
'''
运行结果:
进入事件循环
此时future的状态:<Future pending cb=[_run_until_complete_cb() at E:\python\lib\asyncio\base_events.py:153]>
设置future的结果:Future is done!
此时future的状态:<Future finished result='Future is done!'>
返回结果 Future is done!
关闭事件循环
获取future的结果 Future is done!
'''
关于future的创建
三种方式:
创建future有三种方式,我感觉前面两种方式没什么区别,都不会自动把future添加到loop循环中去。future并没有什么用所以我们选择更加完善的task类,也就是future的子类。通过 loop.run_until_complete 函数创建的例如 loop.run_until_complete(asyncio.wait([coro1(),coro2()])) ,会在 asyncio.wait 函数内部把 coro1 封装成asyncio.ensure_future(coro1()) 。
asyncio.Future()
源码:// asyncio.Future应该就是 asyncio.futures.Future 对象的导出,应该就是创建一个Future类的对象,具体代码懒得找了。
使用:
import asyncio future = asyncio.Future() async def coro1(): print("wait 1 second") await asyncio.sleep(1) print("set_result") future.set_result('data') async def coro2(): result = await future print(result) loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait([ coro1(), coro2() ])) loop.close() ''' 运行结果: wait 1 second set_result data 运行结果解释:首先运行coro1,然后打印出"wait 1 second",遇到await asyncio.sleep(1),coro1睡眠1s放弃控制权。 coro2获得控制权,遇到await future,此时 future 的状态不是finished,所以coro2放弃控制权,相当于阻塞在await future。 经过1s后coro1被loop循环调用,然后调用set_result将future对象的状态设置为finished,然后coro1运行结束。 因为future的状态被设为finished, await future的阻塞被解除,await返回coro2设置的结果"data"并赋值为result变量,然后打印出"data"。 '''
asyncio.ensure_future
源码:
可以看到 asyncio.ensure_future 就是一个对 loop.create_task 的封装,实际上调用的还是loop.create_task,因此会自动把future添加到loop循环。def ensure_future(coro_or_future, *, loop=None): """Wrap a coroutine or an awaitable in a future. If the argument is a Future, it is returned directly. """ if coroutines.iscoroutine(coro_or_future): if loop is None: loop = events.get_event_loop() task = loop.create_task(coro_or_future) if task._source_traceback: del task._source_traceback[-1] return task
使用:
import asyncio async def hello1(a, b): print("Hello world 01 begin") await asyncio.sleep(3) # 模拟耗时任务3秒 print("Hello again 01 end") return a + b def callback(future): # 定义的回调函数 # 获得 future 完成后的结果 print(future.result()) loop = asyncio.get_event_loop() # 创建一个future对象 future = asyncio.ensure_future(hello1(10, 5)) future.add_done_callback(callback) loop.run_until_complete(future) loop.close() ''' 运行结果: Hello world 01 begin Hello again 01 end 15 '''
三种创建future方式的区别
loop.create_future(),asyncio.Future()仅仅是创建了一个future对象,并没有加入loop循环中,loop循环也就不会自动去调用future,因为loop不知道这个future对象存在,这也是,所以如果你要想把future对象加入loop循环中去,就必须使用await 去把 future加入到loop循环之中。而asyncio.ensure_future会把future对象直接加入到loop循环中,loop循环会根据条件自动调用future。
举例一:
import asyncio
async def hello(name):
print("正在执行future", name)
loop = asyncio.get_event_loop()
# 并没有使用 loop.run_until_complete 运行下面这个
# 证明 asyncio.ensure_future 会自动把协程函数添加进future
asyncio.ensure_future(hello("future1"))
future2 = asyncio.ensure_future(hello("future2"))
loop.run_until_complete(future2)
loop.close()
举例二:
import asyncio
def done_callback(future):
print("done")
print(future)
async def do_some_work(x):
print("Waiting " + str(x))
await asyncio.sleep(x)
futu = asyncio.Future()
# 添加一个 done_callback 的回调函数
# 证明 asyncio.Future 不会自动被添加进回调函数
futu.add_done_callback(done_callback)
future2 = asyncio.ensure_future(do_some_work(3))
loop = asyncio.get_event_loop()
loop.run_until_complete(future2)
可以看到 future1 不会被执行。
关于future的判断
可以通过 asyncio.isfuture() 来判断一个对象是不是future对象,asyncio.isfuture()实际上就是 base_futures.isfuture 换了个名字.....
asyncio.isfuture()的源码:
# futures.py
isfuture = base_futures.isfuture
使用:
import asyncio
future = asyncio.Future()
print("判断是不是future对象", asyncio.isfuture(future))
'''
运行结果:
判断是不是future对象 True
'''
cancel
取消future的运行。
使用:
import asyncio
async def cancel_me():
print('cancel_me')
async def main(loop):
future = loop.create_future()
future.cancel()
try:
print("开始执行future")
await future
except asyncio.CancelledError:
print("future 被取消")
loop = asyncio.get_event_loop()
task = loop.create_task(main(loop))
loop.run_until_complete(task)
'''
运行结果:
开始执行future
future 被取消
'''
注意使用cancel函数可能会导致某些后果。例如下面的一个例子:
import asyncio
future = asyncio.Future()
async def coro1():
print("wait 1 second")
await asyncio.sleep(3)
future.cancel()
async def coro2():
print("coro2")
await asyncio.sleep(6)
# 当 coro2() 在内部调用 await future 时,
# 该协程函数会阻塞在 await future,loop循环执行其他task
# 只有当 future 的状态 被设置为 finished 时候
# await future 才会停止阻塞,继续向下执行
# 由于取消了future,这个future的状态为 Cancelled
# 只有当future状态为finished时,coro2才会继续往下运行
# 所以协程 coro2()会一直卡死在这里
# 这个理解不是很全面,但是这里篇幅限制,省略了
result = await future
print("future运行完成")
if future.cancelled():
print("future被取消")
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait([
coro1(),
coro2()
]))
loop.close()
'''
运行结果:
wait 1 second
coro2
'''
调用分析:当 coro2() 在 内部调用 await future 时,该协程函数会阻塞在 await future,loop循环执行其他task。
只有当 future 的状态 被设置为 finished 时候,await future 才会停止阻塞,继续向下执行。由于取消了future,这个时候 future状态 为 Cancelled,只有当 future状态 为 finished 时,coro2才会继续往下运行,所以协程 coro2()会一直 卡死 在这里。(ps: 这个理解不是很全面,但是这里篇幅限制,省略了)
__schedule_callbacks
这个其实就是保存 通过 add_done_callback 添加的回调函数 的列表。当调用 future.set_result 时会执行 __schedule_callbacks 里面保存的回调函数。
有兴趣去 Python线程、协程探究(3)——协程的调度实现 - 知乎 查看。
cancelled
判断future是否被取消,被取消了返回true。搞不懂有啥用,先放这。
用法:
import asyncio
async def cancel_me():
print('cancel_me')
async def main(loop):
future = loop.create_future()
future.cancel()
try:
print("开始执行future")
await future
except asyncio.CancelledError:
print("future 被取消")
print("future被取消", future.cancelled())
loop = asyncio.get_event_loop()
task = loop.create_task(main(loop))
loop.run_until_complete(task)
'''
运行结果:
开始执行future
future 被取消
future被取消 True
'''
done
判断future是否执行完成(处于finished的状态),完成返回真。
用法:
import asyncio
future = asyncio.Future()
async def coro1():
print("wait 1 second")
await asyncio.sleep(1)
print("set_result")
future.set_result('data')
async def coro2():
result = await future
print(result)
# 判断 future 是否处于finished状态
print("future任务是否执行完成", future.done())
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait([
coro1(),
coro2()
]))
loop.close()
'''
运行结果:
wait 1 second
set_result
data
future任务是否执行完成 True
'''
result
获得future完成后的结果
用法:
import asyncio
async def hello1(a, b):
print("Hello world 01 begin")
await asyncio.sleep(3) # 模拟耗时任务3秒
print("Hello again 01 end")
return a + b
def callback(future): # 定义的回调函数
# 获得 future 完成后的结果
print(future.result())
loop = asyncio.get_event_loop()
future = asyncio.ensure_future(hello1(10, 5))
future.add_done_callback(callback)
loop.run_until_complete(future)
loop.close()
'''
运行结果:
Hello world 01 begin
Hello again 01 end
15
'''
add_done_callback
通过add_done_callback添加回调函数,当添加多个回调函数的时候会按照先后顺序依次调用,先添加先调用,后添加后调用。
用法:
import asyncio
# 回调函数
# 会给done_callback返回一个future对象
def done_callback(future):
print("done")
print(future)
async def do_some_work(x):
print("Waiting " + str(x))
await asyncio.sleep(x)
futu = asyncio.ensure_future(do_some_work(3))
# 添加一个done_callback的回调函数
futu.add_done_callback(done_callback)
loop = asyncio.get_event_loop()
loop.run_until_complete(futu)
'''
运行结果:
Waiting 3
done
<Task finished coro=<do_some_work() done, defined at E:/pycharm/daima/pycode/协程/future协程添加回调.py:8> result=None>
'''
remove_done_callback
通过remove_done_callback添加回调函数
用法:
import asyncio
# 回调函数
def done_callback(future):
print("我是回调函数1")
print(future)
# 回调函数2
def done_callback2(future):
print("我是回调函数2")
async def do_some_work(x):
print("Waiting " + str(x))
await asyncio.sleep(x)
future = asyncio.ensure_future(do_some_work(3))
future.add_done_callback(done_callback)
future.add_done_callback(done_callback2)
# 删除回调函数1
future.remove_done_callback(done_callback)
loop = asyncio.get_event_loop()
loop.run_until_complete(future)
'''
运行结果:
Waiting 3
我是回调函数2
'''
set_result
将future设置为 finished 状态,同时设置一个结果,可以通过 future.result() 方法获得这个结果。
import asyncio
future = asyncio.Future()
async def coro1():
print("wait 1 second")
await asyncio.sleep(1)
print("set_result")
# 将future设置为finished状态,同时设置一个结果
future.set_result('data')
async def coro2():
result = await future
print(result)
print("future任务是否执行完成", future.done())
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait([
coro1(),
coro2()
]))
loop.close()
'''
运行结果:
wait 1 second
set_result
data
future任务是否执行完成 True
'''
task对象
task是future的子类,所以task继承了future的属性和方法。
pycharm中使用
import asyncio.tasks
import asyncio.futures
然后按CTRL同时鼠标点击tasks即可查看tasks类的定义
关于task的创建
task的创建有三种方法, loop.create_task和asyncio.create_task,asyncio.Task。都会把创建的task加入到loop循环。
asyncio.create_task 其实在内部还是调用的 loop.create_task,但是 asyncio.create_task 必须有一个 正在运行的loop循环中 使用,否则会爆错 "RuntimeError: no running event loop"。
asyncio.Task 直接调用Task类进行创建。
loop.create_task
使用:
import asyncio async def main(): print("创建一个task") loop = asyncio.get_event_loop() task = loop.create_task(main()) loop.run_until_complete(task) ''' 运行结果: 创建一个task '''
asyncio.create_task
源码:
asyncio.create_task 就是对 loop.create_task 的封装。def create_task(coro): """Schedule the execution of a coroutine object in a spawn task. Return a Task object. """ loop = events.get_running_loop() return loop.create_task(coro)
使用:
import asyncio async def hello(): print("创建另一个task") async def main(): # 必须在一个正在运行的loop循环中创建,否则会爆错 "RuntimeError: no running event loop" asyncio.create_task(hello()) print("创建一个task") loop = asyncio.get_event_loop() task = loop.create_task(main()) loop.run_until_complete(task) ''' 运行结果: 创建一个task 进入另一个task '''
asyncio.Task
使用:import asyncio async def hello(): print("hello") loop = asyncio.get_event_loop() task = asyncio.Task(hello()) loop.run_until_complete(task) ''' 运行结果: hello '''
关于task的状态
由于task是future的子类,所以task的状态应该就是和future是一样的。懒得翻源码了。
- Pending
- Running
- Done
- Cancelled
Pending 就绪态,Running 正在运行,Done 运行完成,Cancelled 任务被取消。
关于task的判断
没有找到 ansyncio自带的模块,这里直接用instance方法。
import asyncio
async def say_after_time(delay, what, name):
await asyncio.sleep(delay)
loop = asyncio.get_event_loop()
task = loop.create_task(say_after_time(1, "hello", "world"))
print(isinstance(task, asyncio.Task))
'''
运行结果:
True
'''
current_task 与 all_task
current与all_task都是静态方法,可以通过asyncio.current_task 与 asyncio.all_task 可以分别查看当前的正在运行的task和loop循环中的全部task。
import datetime
import asyncio
async def say_after_time(delay, what, name):
await asyncio.sleep(delay)
async def main(name):
print("开始时间为", datetime.datetime.now())
# 打印出当前正在运行的task
# 可以看到task的状态为 running
task = asyncio.current_task()
blocking = getattr(task, '_asyncio_future_blocking', None)
print("当前正在运行的task: ", task, "当前task名字: ", name)
# 打印出当前循环中的全部task
task = asyncio.all_tasks()
print("全部task", task, "当前task名字: ", name)
await say_after_time(1, "hello", name)
await say_after_time(5, "world", name)
print("结束时间为:", datetime.datetime.now())
loop = asyncio.get_event_loop()
task = loop.create_task(main("task1"))
task2 = loop.create_task(main("task2"))
tasklist = [task, task2]
loop.run_until_complete(asyncio.wait(tasklist))
loop.close()
'''
运行结果:
开始时间为 2020-12-25 15:22:33.225552
当前正在运行的task: <Task pending coro=<main() running at E:/pycharm/daima/pycode/协程/task类的current_task与all_tasks用法.py:13>> 当前task名字: task1
全部task {<Task pending coro=<wait() running at E:\python\lib\asyncio\tasks.py:366> cb=[_run_until_complete_cb() at E:\python\lib\asyncio\base_events.py:153]>, <Task pending coro=<main() running at E:/pycharm/daima/pycode/协程/task类的current_task与all_tasks用法.py:17>>, <Task pending coro=<main() running at E:/pycharm/daima/pycode/协程/task类的current_task与all_tasks用法.py:6>>} 当前task名字: task1
开始时间为 2020-12-25 15:22:33.225552
当前正在运行的task: <Task pending coro=<main() running at E:/pycharm/daima/pycode/协程/task类的current_task与all_tasks用法.py:13>> 当前task名字: task2
全部task {<Task pending coro=<wait() running at E:\python\lib\asyncio\tasks.py:366> cb=[_run_until_complete_cb() at E:\python\lib\asyncio\base_events.py:153]>, <Task pending coro=<main() running at E:/pycharm/daima/pycode/协程/task类的current_task与all_tasks用法.py:20> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x000002D626E10EB8>()]>>, <Task pending coro=<main() running at E:/pycharm/daima/pycode/协程/task类的current_task与all_tasks用法.py:17>>} 当前task名字: task2
结束时间为: 2020-12-25 15:22:39.227163
结束时间为: 2020-12-25 15:22:39.227163
'''
cancel
取消task
用法:
import asyncio
async def cancel_me():
print('cancel_me')
async def main(loop):
task = loop.create_task(cancel_me())
task.cancel()
try:
print("开始执行task")
await task
except asyncio.CancelledError:
print("task 被取消")
loop = asyncio.get_event_loop()
task = loop.create_task(main(loop))
loop.run_until_complete(task)
'''
运行结果:
开始执行task
task 被取消
'''
__step
源码中明明写的 __step 不知到图片中怎么弄成 _step 了。__step是整个协程的核心。生成器函数是怎么封装成task的,以及loop循环怎么操作生成器的,都是这个函数的功劳。
一个最简单的生成器函数例如下面:
def aa():
yield "1234"
首先这个生成器函数会在task类中执行 self._coro = coro ,然后self._loop.call_soon(self.__step, context=self._context)会把 self.__step 加入到loop循环中,这也是我们说的 loop.create_task 会把创建的 task 自动添加到loop循环的原因。然后loop循环就通过self.__step调动生成器函数。self.__step的内部实现就是
coro = self._coro
# self.__step内部通过send来调动生成器
result = coro.send(None)
可以看到 self.__step内部通过send方法来调动生成器。更多详细内容查看 python协程的原理 - keepnight
__wakeup
当在 一个协程内部 await 另一个协程时,当前协程的 __wakeup 被加入 另一个协程的回调函数。当 另一个协程 执行完毕时,会通过执行 另一个协程的回调函数 来接着执行当前协程。
__wakeup的源代码:
def __wakeup(self, future):
# 执行当前协程的__step函数
self.__step()
本作品采用 知识共享署名-相同方式共享 4.0 国际许可协议 进行许可。