通过 asyncio 实现基于协程的并发编程

2019-07-11 16:37:33   最后更新: 2019-07-11 16:44:56   访问数量:93




此前的文章中,我们详细介绍了 python 中的协程

python 的协程

 

协程是在用户进程中,按照用户预先设定的执行流程进行上下文切换,从而在开销远小于多线程/多进程并发的条件下实现程序的并发执行

asyncio,tornado 和 gevent 在 python 原有协程机制的基础上封装了更为易用的高层次 api,本文我们就来详细介绍 asyncio 包基于协程实现的异步 IO

 

 

asyncio 实现了一个协程库,他具有下面几个组件:

 

事件循环 -- event_loop

协程是在用户进程中进行上下文切换实现的,与多线程/多进程并发执行的本质区别是没有操作系统来执行调度

在 asyncio 中,事件循环就充当了操作系统的角色,负责调度在事件循环上注册的协程函数

 

协程 -- coroutine

协程对象是通过 async 关键字定义的函数,他需要被注册到事件循环上,在事件循环执行过程中进行调用

 

任务 -- task

一个协程对象就是一个原生可以挂起的函数

任务时对协程的进一步封装,其中记录了任务的状态等信息

 

future

我们此前已经介绍过 future 对象,他是一个用来表示将来会被执行的任务的对象

正如我们之前提到,python 标准库中,在两个包中封装了 Future 类:

  1. concurrent
  2. asyncio

 

在两个包中封装的 Future 类本质上和用法上都是非常接近的

 

async/await 关键字

async 关键字用于定义一个协程方法

await 关键字则用于挂起阻塞的异步调用接口

他们都是 python3.5 引入的关键字

 

可等待对象

可以被加入事件循环的对象就是可等待对象,分为三种类型:

  1. async 关键字标识的协程对象
  2. Task 对象
  3. Future 对象

 

import time import asyncio async def do_some_work(x): print('Waiting: ', x) if __name__ == '__main__': now = lambda: time.time() start = now() coroutine = do_some_work(2) loop = asyncio.get_event_loop() loop.run_until_complete(coroutine) print('TIME: ', now() - start)

 

 

上面的例子中,我们首先通过 asyncio 的 get_event_loop 方法创建了事件循环,然后将被 async 关键字标记的协程方法注册到事件循环中,事件循环负责调起该方法

方法顺利执行,打印出了:

Waiting:  2

TIME:  0.002991914749145508

 

python3.7 的优化

创建事件循环看上去非常繁琐,python3.7 引入了 asyncio.run 方法,让你可以省去这个操作:

import time import asyncio async def do_some_work(x): print('Waiting: ', x) if __name__ == '__main__': now = lambda: time.time() start = now() asyncio.run(do_some_work(2)) print('TIME: ', now() - start)

 

 

由于该方法目前属于“暂定基准状态”,所以本文仍然使用上述事件循环方式启动和运行协程

 

asyncio 中 Task 对象是 Future 对象的子类

上面的例子中,事件循环的 run_until_complete 方法实际上是将我们的协程方法封装成了 Task 对象并运行

我们也可以显式手动创建 Task 对象,这样最大的好处在于我们可以对协程方法进行更为灵活的控制,例如监控任务执行的状态等

 

任务的状态

Task 和 Future 对象一样,拥有四种执行状态:

  1. Pending -- 等待执行
  2. Running -- 执行中
  3. Done -- 完成执行
  4. Canceled -- 已被取消

 

创建 Task 对象

我们有三种方法创建一个 task 对象,下文中,变量 coroutine 表示我们的协程方法对象:

  1. 通过事件循环对象 -- loop.create_task(coroutine)
  2. asyncio.ensure_future --  asyncio.ensure_future(coroutine)
  3. asyncio.create_task -- 仅限 python3.7 及以上,asyncio.ensure_future(coroutine)

 

示例

import asyncio import logging import time async def say_after(delay, what): await asyncio.sleep(delay) logging.info(what) return what if __name__ == '__main__': logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s: %(message)s') print(f"started at {time.strftime('%X')}") loop = asyncio.get_event_loop() task = loop.create_task(say_after(2, 'world')) logging.info(task) loop.run_until_complete(task) logging.info(task) print(f"finished at {time.strftime('%X')}")

 

 

打印出了:

started at 20:55:11

2019-07-11 20:55:11,769 - INFO: <Task pending coro=<say_after() running at C:/Debin/Workspace/code/python/fluentpython/testaisyncio/task_demo.py:6>>

2019-07-11 20:55:13,771 - INFO: world

2019-07-11 20:55:13,771 - INFO: <Task finished coro=<say_after() done, defined at C:/Debin/Workspace/code/python/fluentpython/testaisyncio/task_demo.py:6> result='world'>

finished at 20:55:13

 

任务的取消 -- cancel

cancel()

 

Task 对象具有 cancel 方法,允许我们取消一个已经提交到事件循环,但尚未完成的任务

该 Task 对象的协程函数会抛出 CancelledError 异常

如果在协程中捕获 CancelledError 异常,取消将会被抑制,但这是不推荐的做法

 

屏蔽取消 -- asyncio.shield

更为推荐的方法是 asyncio.shield 方法:

asyncio.shield(arg, *, loop=None)

 

arg 是一个协程方法

这个方法用于设置该对象屏蔽 cancel 方法:

res = await something() # 未屏蔽 cancel res = await shield(something()) # 屏蔽 cancel

 

 

Task 结果的获取 -- result 方法与回调

result 方法

Task 作为 Future 的子类,也同样具有 Future 的 result 方法,实现阻塞等待并获取返回

import asyncio import logging import time async def say_after(delay, what): await asyncio.sleep(delay) logging.info(what) return what if __name__ == '__main__': logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s: %(message)s') print(f"started at {time.strftime('%X')}") loop = asyncio.get_event_loop() task = loop.create_task(say_after(2, 'world')) loop.run_until_complete(task) print(f"finished by {task.result()} at {time.strftime('%X')}")

 

 

打印出了:

started at 21:56:12

2019-07-11 21:56:14,720 - INFO: world

finished by world at 21:56:14

 

绑定回调 -- add_done_callback

通过 add_done_callback 方法,我们可以将一个回调方法绑定到 Task 对象上,一旦任务完成运行,会自动以一个 Future 对象为参数调用预设的 callback 方法

import asyncio import logging import time async def say_after(delay, what): await asyncio.sleep(delay) logging.info(what) return what def callback(future): logging.info(f"task callback for {future.result()}") if __name__ == '__main__': logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s: %(message)s') print(f"started at {time.strftime('%X')}") loop = asyncio.get_event_loop() task = loop.create_task(say_after(2, 'world')) task.add_done_callback(callback) loop.run_until_complete(task) print(f"finished at {time.strftime('%X')}")

 

 

打印出了:

started at 21:58:42

2019-07-11 21:58:44,955 - INFO: world

finished at 21:58:44

2019-07-11 21:58:44,955 - INFO: task callback for world

 

取消绑定回调 -- remove_done_callback

我们也可以在调用 add_done_callback 后,通过相同参数调用 remove_done_callback 方法来取消回调

 

上面的例子中,我们使用了 asyncio.sleep 方法:

coroutine asyncio. sleep(delay, result=None, *, loop=None)

 

这个方法与 time.sleep 基本一致,都是挂起当前任务,阻塞 delay 指定的秒数,阻塞中允许其他任务运行

不同之处在于,如果传递了 result,则会在协程完成时将其返回给调用者

最后一个参数 loop 已经被废弃,预计将于 python3.10 移除

 

使用协程最重要的当然是并发运行任务,asyncio 包中,gather 方法就是用来并发运行我们的一系列协程对象的

awaitable asyncio. gather(*aws, loop=None, return_exceptions=False)

 

说明

参数

  1. aws -- 可等待对象集合
  2. loop -- 该参数已被废弃
  3. return_exceptions -- 是否等待返回时抛出异常,为 False 会立即抛出异常,否则在所有可等待对象运行完成后将异常聚合至结果列表返回

 

返回

gather 返回的同样是一个可等待对象,可以通过调用该对象的 cancel 方法取消,所有通过 gather 方法提交但尚未完成的可等待对象也会被取消

 

示例

import asyncio import logging async def factorial(name, number): f = 1 for i in range(2, number + 1): logging.info(f"Task {name}: Compute factorial({i})...") await asyncio.sleep(1) f *= i logging.info(f"Task {name}: factorial({number}) = {f}") async def main(): # Schedule three calls *concurrently*: await asyncio.gather( factorial("A", 2), factorial("B", 3), factorial("C", 4), ) if __name__ == '__main__': logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s: %(message)s') loop = asyncio.get_event_loop() loop.run_until_complete(main())

 

 

打印出了:

2019-07-11 21:06:43,802 - INFO: Task A: Compute factorial(2)...

2019-07-11 21:06:43,803 - INFO: Task B: Compute factorial(2)...

2019-07-11 21:06:43,803 - INFO: Task C: Compute factorial(2)...

2019-07-11 21:06:44,804 - INFO: Task A: factorial(2) = 2

2019-07-11 21:06:44,804 - INFO: Task B: Compute factorial(3)...

2019-07-11 21:06:44,805 - INFO: Task C: Compute factorial(3)...

2019-07-11 21:06:45,805 - INFO: Task B: factorial(3) = 6

2019-07-11 21:06:45,805 - INFO: Task C: Compute factorial(4)...

2019-07-11 21:06:46,806 - INFO: Task C: factorial(4) = 24

 

欢迎关注微信公众号,以技术为主,涉及历史、人文等多领域的学习与感悟,每周三到七篇推文,全部原创,只有干货没有鸡汤

 

 






读书笔记      python      并发      concurrent      并发编程      asyncio      coroutine      事件循环     


1#派客: (回复)2019-07-24 10:18:10

一般比较常用的是asyncio的几个wait方法,博主后面会不会总结这部分呢?

京ICP备15018585号