python 中的进程池与线程池 -- Future 与 Executor

2019-07-02 16:39:19   最后更新: 2019-07-02 16:39:19   访问数量:111




上一篇文章中,我们介绍了 Python multiprocessing 包中提供的强大的进程池组件

python 中的进程池 -- multiprocessing.pool.Pool

 

说到并发编程,熟悉 java 的同学一定对 java 中简单易用的 Future 类设计十分了解,python 吸收了 java 的这一设计,也提供了 Future 类来提供便利的并发编程

python 中 Future 最大的优势在于他将进程池、线程池与异步IO并发编程全部统一到同一套工具中,使用者只需要通过参数进行选择即可,极大地降低了使用者的学习成本与编程难度,本文我们就来详细介绍一下 python 中并发编程的重要组件 -- Future 的使用

 

 

python3.4 在两个包里引入了 Future 类:

  1. concurrent.futures.Future -- 用于实现进程池/线程池并发
  2. asyncio.Future -- 用于实现基于异步 IO 的并发

 

Future 类的实例表示延迟任务本身,通过 Future 实例,我们就可以拿到异步执行的计算任务的返回

通常情况下,我们不应主动创建 Future 实例,因为顾名思义,Future 对象表示未来需要做的事情,只有在排定排期后才应该被创建

例如 concurrent.futures.Executor 类的 submit 方法会为传入的可调用方法排期,从而返回一个 Future 对象

同样,我们也不应该去改变 Future 实例的对象,因为只有任务执行的过程才能够影响 Future 对象的状态

 

Future 的类方法

询问是否已完成 -- done

done(self)

 

该方法立即返回一个布尔值,用来表示当前可调用对象是否已经完成调用

 

添加执行完成后的回调 -- add_done_callback

add_done_callback(self, fn)

 

除了主动通过 done 方法轮询,我们也可以通过 add_done_callback 方法设置任务执行完成后自动调用的回调方法

 

获取任务执行结果 -- result

result(self, timeout=0)

 

result 方法用来获取任务的执行结果,如果任务执行过程中抛出了异常,则在 result 方法被调用时会重新抛出该异常

如果任务尚未返回,result 方法会阻塞等待,concurrent 包中的该方法提供了一个 timeout 参数,用来传递一个超时时间,如果在超时时间后仍未获取到结果,则会抛出 TimeoutError 异常,但 asyncio 包中的 Future 类的 result 方法则并未提供 timeout 参数

 

上面我们提到了 Executor,我们不应该自己创建 Future 对象,而是应该通过 Executor 来生成

concurrent 包中有两个类继承自 Executor,分别是:

  • ThreadPoolExecutor -- 线程池
  • ProcessPoolExecutor -- 进程池

 

他们分别维护了一个任务队列来控制并发编程,同时隐藏大量细节,让使用者在使用中足够简单

 

提交任务 -- submit

submit(fn, *args, **kwargs)

 

提交一个任务,返回一个 Future 对象用来接收执行结果

 

示例

下面的例子展示了将 15 次任务的执行提交给拥有 10 个进程的进程池,并获取返回

import logging import os from concurrent.futures import * from time import sleep def current_sleep(i): sleep(3) return '%s.%s over sleep' % (i, os.getppid()) if __name__ == '__main__': logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s: %(message)s') results = [] with ThreadPoolExecutor(10) as executor: futrues = [] for i in range(15): futrues.append(executor.submit(current_sleep, i+1)) for futrue in futrues: logging.info(futrue.result())

 

 

打印出了:

2019-07-02 15:02:49,758 - INFO: 1.16992 over sleep

2019-07-02 15:02:49,760 - INFO: 2.964 over sleep

2019-07-02 15:02:49,760 - INFO: 3.1688 over sleep

2019-07-02 15:02:49,761 - INFO: 4.20708 over sleep

2019-07-02 15:02:49,762 - INFO: 5.25720 over sleep

2019-07-02 15:02:49,762 - INFO: 6.3792 over sleep

2019-07-02 15:02:49,765 - INFO: 7.21468 over sleep

2019-07-02 15:02:49,765 - INFO: 8.16480 over sleep

2019-07-02 15:02:49,854 - INFO: 9.13940 over sleep

2019-07-02 15:02:50,694 - INFO: 10.24184 over sleep

2019-07-02 15:02:52,758 - INFO: 11.16992 over sleep

2019-07-02 15:02:52,761 - INFO: 12.1688 over sleep

2019-07-02 15:02:52,761 - INFO: 13.964 over sleep

2019-07-02 15:02:52,762 - INFO: 14.20708 over sleep

2019-07-02 15:02:52,762 - INFO: 15.25720 over sleep

 

可以看到,前 10 个进程在同时返回,此后其中五个进程执行任务并在 sleep 完成执行后返回

 

通过迭代器提交和返回任务 -- map

map(func, *iterables, timeout=None)

 

map 方法与 multiprocessing.pool.Pool 中的 map 方法是一样的,将 iterable 参数传入的可迭代对象传递给不同的进程来处理,返回所有结果收集后的可迭代对象

可以通过 timeout 参数限制任务执行的超时,一旦超时,则会触发 TimeoutError 异常

如果任务执行过程中抛出了异常,map 方法并不会将异常抛出,只有在获取结果时才会抛出

 

示例

下面是通过 map 方法重新编写的上述示例

import logging import os from concurrent.futures import * from time import sleep def current_sleep(i): sleep(3) return '%s.%s over sleep' % (i, os.getpid()) if __name__ == '__main__': logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s: %(message)s') with ProcessPoolExecutor(10) as executor: results = executor.map(current_sleep, range(1, 16)) for result in results: logging.info(result)

 

 

打印出了:

2019-07-02 15:11:17,526 - INFO: 1.15512 over sleep

2019-07-02 15:11:17,533 - INFO: 2.19212 over sleep

2019-07-02 15:11:17,538 - INFO: 3.6428 over sleep

2019-07-02 15:11:17,538 - INFO: 4.16516 over sleep

2019-07-02 15:11:17,542 - INFO: 5.6924 over sleep

2019-07-02 15:11:17,542 - INFO: 6.25212 over sleep

2019-07-02 15:11:17,542 - INFO: 7.22456 over sleep

2019-07-02 15:11:17,543 - INFO: 8.16536 over sleep

2019-07-02 15:11:17,623 - INFO: 9.10108 over sleep

2019-07-02 15:11:18,452 - INFO: 10.17748 over sleep

2019-07-02 15:11:20,526 - INFO: 11.15512 over sleep

2019-07-02 15:11:20,534 - INFO: 12.19212 over sleep

2019-07-02 15:11:20,539 - INFO: 13.6428 over sleep

2019-07-02 15:11:20,539 - INFO: 14.16516 over sleep

2019-07-02 15:11:20,541 - INFO: 15.22456 over sleep

 

虽然任务是在不同时间先后完成的,但只有当全部完成或超时才会返回

 

关闭进程/线程池 -- shutdown

shutdown(wait=True)

 

关闭进程/线程池,此后进程/线程池不再接受 map 或 submit 调用,否则将触发 RuntimeError

如果 wait 为 True,则阻塞等待进程/线程池关闭后返回,否则立即返回

 

此前我们介绍了 Python 中的 GIL 锁,受此影响,Python 每一个时刻只能调度一个线程,这意味着并发并没有真的在进行

而多进程则不同,多进程并发的模式中,由于进程间严格的隔离,他们得以真正的并行执行

同时,Python 多进程让多核 CPU 得以被利用

 

但相比多线程机制,多进程的模式也存在一些缺点和不足:

  1. 进程切换更为耗时
  2. 进程间通信相比线程间共享的数据更为复杂

 

因此,IO 密集型操作尽量使用 ThreadPoolExecutor 来执行,而对于 ProcessPoolExecutor,则只有在安装有多个 CPU 的高性能计算机上执行 CPU 密集型任务时,具有较大优势

 

ThreadPoolExecutor 与 ProcessPoolExecutor 分别实现了简单易用的线程池与进程池,但他们只是使用方法上的封装,底层仍然是通过调用 threading 与 multiprocessing 来实现的

对于相对简单的模式,通过 Executor 即可完成,那么使用 threading/multiprocessing 就显得过于复杂,但很多情况下,我们需要进行线程同步、进程间通信等复杂的需求,此时就只能使用 threading/multiprocessing 来实现了

 

在 python 中 Future 类被封装在两个包中:

  • concurrent.futures
  • asyncio

 

本文我们详细介绍了并发环境下,concurrent.futures 包中提供的进程池与线程池组件的用法,asyncio 包则实现了任务的异步执行,具体的使用方法敬请关注主页君的下一篇文章,谢谢

 

欢迎关注微信公众号,以技术为主,涉及历史、人文等多领域的学习与感悟,每周三到七篇推文,全部原创,只有干货没有鸡汤

 

 

《流畅的 python》

https://docs.python.org/3.4/library/concurrent.futures.html#threadpoolexecutor-example

 






读书笔记      python      concurrent      executor      future      futures      并发编程      流畅的python     


京ICP备15018585号