摘要:记录一些python协程常见的用法,以及一些注意点。

Task was destroyed but it is pending!

如果loop循环关闭的时候,还有协程处于pending状态,控制台会报错,Task was destroyed but it is pending!。

代码如下:

import asyncio
async def main():
    print("创建一个task")
loop = asyncio.get_event_loop()
task = loop.create_task(main())
# Task was destroyed but it is pending!
loop.close()

'''
运行结果:
ask was destroyed but it is pending!
task: <Task pending coro=<main() running at E:/pycharm/daima/pycode/协程/task类的create_task用法2.py:4>>
sys:1: RuntimeWarning: coroutine 'main' was never awaited
'''

asyncio.as_completed 和 asyncio.wait 和 asyncio.gather 三者之间的区别

  • 执行过程的区别

    1. asyncio.as_completedasyncio.wait 封装的协程函数开始执行的顺序是无序的,不一定先执行哪个协程。
    2. asyncio.gather封装的协程函数从左往右依次执行,当左边的协程函数执行await asyncio.sleep(1)或者其他类似的时候,才会执行第二个协程。gather 起聚合的作用,先把所有传入的协程函数分别使用ensure_future包装成future对象,然后这些future对象再被合起来组成一个总的future对象,因为 loop.run_until_complete 只接受单个 future。

      asyncio.as_completedasyncio.wait 函数在内部使用一个set保存它创建的Task实例。因为set是无序的,所以这也就是我们的任务不是顺序执行的原因。

    3. asyncio.wait先把所有传入的协程函数分别使用ensure_future包装起来,然后这些future对象再被合起来组成一个总的future对象。
    4. asyncio.as_completed也是先把所有传入的协程函数分别使用ensure_future包装起来。
    5. asyncio.as_completed 不会阻塞,直接返回。await asyncio.waitawait asyncio.gather 都会阻塞直到全部协程完成才会返回。
  • 返回值的区别

    asyncio.as_completed 会返回一个生成器,然后 await 这个生成器,会返回协程函数的运行结果,哪个协程函数先执行完先返回哪个结果。
    await asyncio.wait 的返回值是一个元组,包括两个集合,分别表示已完成和未完成的任务也就是(done, pending),左边是finished状态的协程,右边是pending状态的协程。
    await asyncio.gather 会返回一个结果列表,里面包含协程函数的运行结果,返回结果是有序的,按照传入协程函数的顺序,顺序输出。

  • 源代码

    • asyncio.as_completed
    todo = {ensure_future(f, loop=loop) for f in set(fs)}
    • asyncio.wait
    fs = {ensure_future(f, loop=loop) for f in set(fs)}
    • asyncio.gather
      asyncio.gather这个有点特别,在 Python 3.4 中 asyncio.gather 封装的协程函数开始执行的顺序是无序的,而在Python 3.7中asyncio.gather封装的协程函数从左往右依次执行
      Python 3.4中的源码

      for arg in set(coros_or_futures):

      Python 3.7中的源码

      for arg in coros_or_futures:

关于回调函数参数的问题

代码一:

import asyncio
import time
from functools import partial
async def get_html(url):
    await asyncio.sleep(2)
    return "body"


def callback1(url, future): # 传入值的时候,future必须在最后一个
    print("我是传入回调函数的参数", url)
    print("我是协程函数的运行结果", future.result())

if __name__ == "__main__":
    start_time = time.time()
    loop = asyncio.get_event_loop()
    task = loop.create_task(get_html("http://www.imooc.com"))
    task.add_done_callback(partial(callback1, "http://www.imooc.com"))
    loop.run_until_complete(task)

代码二:

import asyncio
import time
from functools import partial
async def get_html(url):
    await asyncio.sleep(2)
    return "body"


def callback1(future): 
    print("我是协程函数的运行结果", future.result())

if __name__ == "__main__":
    start_time = time.time()
    loop = asyncio.get_event_loop()
    task = loop.create_task(get_html("http://www.imooc.com"))
    task.add_done_callback(callback1)
    loop.run_until_complete(task)

为什么callback1(url, future)而不是callback1(future, url)?查看源码之后会发现很简单.....

核心代码:

# futures.py
def add_done_callback(self, fn):
    if self._state != _PENDING:
        self._loop.call_soon(fn, self)
    else:
        self._callbacks.append(fn)
        
        
# base_events.py        
def call_soon(self, callback, *args):
    handle = self._call_soon(callback, args)
    return handle

# base_events.py
def _call_soon(self, callback, args):
    handle = events.Handle(callback, args, self)
    self._ready.append(handle)
    return handle

# events.py
#  Handle类
def __init__(self, callback, args, loop):
    self._args = args

def _run(self):
    self._callback(*self._args)

partial(callback1, "http://www.imooc.com")运行之后,self._callback(*self._args)就相当于callback1("http://www.imooc.com",*self._args)