python 中的进程池 -- multiprocessing.pool.Pool

2019-06-27 22:08:43   最后更新: 2019-06-27 22:08:43   访问数量:64




上一篇文章中,我们介绍了如何通过 multiprocessing 进行多进程并发编程

通过 multiprocessing 实现 python 多进程

 

本文,我们来介绍一下 multiprocessing 中提供的进程池组件 -- Pool

 

 

通过上一篇文章中所介绍的 multiprocessing 开辟进程的方式来实现并发编程存在下面的几个问题:

  1. 耗时 -- 进程执行方法前开辟,方法执行后自动退出,进程开辟与回收都十分耗时
  2. 资源利用率低 -- 进程反复开辟与回收,无法实现资源的重复利用
  3. 编程繁琐 -- 每个进程对象都需要创建并调用 start 方法才可以被执行
  4. 不利于资源管理 -- 如果系统资源有限,同时只能有有限个进程执行,则通过上文开辟进程的方法,我们完全无法控制进程执行的具体行为

 

解决上述问题最简单的方式就是池化执行,由进程池来管理并复用若干个进程,就可以解决上述的所有问题,既限制了同时最大的并发进程数,也避免了反复开辟与回收的资源浪费,保证了最大的资源利用效率

multiprocessing 提供了进程池组件 -- Pool,让我们方便的创建一个进程池

 

构造方法

multiprocessing.pool.Pool(processes=None, initializer=None, initargs=(), maxtasksperchild=None, context=None)

 

参数介绍

  1. processes -- 进程池中进程数量,如果为 None,则使用 os.cpu_count() 返回的值
  2. initializer -- 如果该参数不为 None,则所有进程池中的进程启动时都会先执行 initializer(*initargs)
  3. maxtasksperchild -- 如果该参数不为 None,则进程在执行 maxtasksperchild 次任务后会被自动销毁、重启
  4. context -- 用于指定进程池中进程运行的上下文,具体参考上篇文章中的介绍

 

进程池最重要的就是使用了,但需要注意的是,所有下面这些方法都必须由创建进程池的进程调用

 

apply

apply(func, args=None, kwds=None)

 

同步执行函数 func

 

示例

import logging import os from multiprocessing.pool import Pool from time import sleep def f(): logging.info('f %s' % os.getpid()) sleep(1) if __name__ == '__main__': logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s: %(message)s') with Pool(4) as pool: pool.apply(f) pool.apply(f) pool.apply(f) pool.apply(f) pool.apply(f)

 

 

打印出了:

2019-06-26 16:25:20,028 - INFO: f 198

2019-06-26 16:25:21,035 - INFO: f 199

2019-06-26 16:25:22,040 - INFO: f 200

2019-06-26 16:25:23,043 - INFO: f 201

2019-06-26 16:25:24,045 - INFO: f 198

 

可以看到,前4次调用 f 依次使用了进程池中的四个不同的进程,最后一次调用复用了第一个进程,每次调用都等待了 1 秒钟

 

apply_async

apply_async(self, func, args=(), kwds={}, callback=None, error_callback=None)

 

上面的例子中,每次调用都间隔了 1 秒钟,没有实现真正的并发,所以我们需要异步执行所有的调用

apply_async 就是 apply 的异步版本

参数与 apply 大体相同,增加了可选的执行完成后自动调用的回调方法参数

 

示例

import logging import os import time from multiprocessing.pool import Pool from time import sleep def f(): sleep(1) return '%s finish f_call at %s' % (os.getpid(), time.strftime('%Y-%m-%d %H:%M:%S')) if __name__ == '__main__': result = [] with Pool(4) as pool: result.append(pool.apply_async(f)) result.append(pool.apply_async(f)) result.append(pool.apply_async(f)) result.append(pool.apply_async(f)) result.append(pool.apply_async(f)) for res in result: logging.info(res.get(timeout=10))

 

 

打印出了:

2019-06-26 18:26:10,563 - INFO: 7212 finish f_call at 2019-06-26 18:26:10

2019-06-26 18:26:10,563 - INFO: 5440 finish f_call at 2019-06-26 18:26:10

2019-06-26 18:26:10,672 - INFO: 7044 finish f_call at 2019-06-26 18:26:10

2019-06-26 18:26:11,577 - INFO: 7212 finish f_call at 2019-06-26 18:26:11

2019-06-26 18:26:11,577 - INFO: 5440 finish f_call at 2019-06-26 18:26:11

 

通过异步,我们做到了真正的并发调用

 

map

map(func, iterable, chunksize=0)

 

与 Python 标准库中的 map 方法有着相同的用法和功能,不同的是,进程池中的该方法会将 iterable 参数传入的可迭代对象分成 chunksize 份传递给不同的进程来处理

 

示例

import logging import os from multiprocessing.pool import Pool from time import sleep import time def f(i): print('%s get %s at %s' % (os.getpid(), i, time.strftime('%Y-%m-%d %H:%M:%S'))) sleep(1) if __name__ == '__main__': with Pool(4) as pool: pool.map(f, range(10))

 

 

打印出了:

107 get 0 at 2019-06-27 19:50:30

109 get 2 at 2019-06-27 19:50:30

108 get 1 at 2019-06-27 19:50:30

110 get 3 at 2019-06-27 19:50:30

107 get 4 at 2019-06-27 19:50:31

109 get 5 at 2019-06-27 19:50:31

108 get 6 at 2019-06-27 19:50:31

110 get 7 at 2019-06-27 19:50:31

107 get 8 at 2019-06-27 19:50:32

109 get 9 at 2019-06-27 19:50:32

 

可以看到,进程池中的四个进程在同一时刻实现了并发调用,随后并发等待1秒后进行下一轮并发调用

与 apply 的同步调用相比,性能有了很大幅度的提升了

 

map_async

map_async(func, iterable, chunksize=0, callback=None, error_callback=None)

 

与 apply_async 类似,map_async 是 map 的异步版本,我们可以通过他返回的对象的阻塞调用 get 方法来获取进程执行后的结果,与 apply_async 不同的是,map_async 会先收集多个进程的运行结果后返回

 

imap

imap(func, iterable, chunksize=0)

 

有时,我们调用 map 传入的可迭代对象的可迭代次数会非常多,如果通过 map 来进行任务的分配和回收,显然会因为计算量过大而出现过度耗时的情况

此前的文章中,我们介绍过生成器与协程,是否可以借助协程的思想通过迭代器与 next 方法逐步获取结果呢?python 进程池已经考虑到这一情况,并引入了 imap 方法,来返回一个迭代器,通过 next 方法逐步拿到其运行结果

imap 方法与 map 方法在参数上是一模一样的,不同之处仅在于其返回的结果

他返回的结果对象是一个迭代器,可以通过向标准库 next 方法传入该迭代器来迭代结果,也可以调用迭代器本身提供的 next 方法来获取结果,值得一提,迭代器本身提供的 next 方法允许传入一个整数的 timeout 参数作为最大超时

 

示例

import logging import os from multiprocessing.pool import Pool from time import sleep import time def f(i): sleep(1) return '%s finsh sleep by %s at %s' % (os.getpid(), i, time.strftime('%Y-%m-%d %H:%M:%S')) if __name__ == '__main__': with Pool(4) as pool: it = pool.imap(f, range(10)) res = next(it) while res: print(res) res = next(it)

 

 

打印出了:

181 get 0 at 2019-06-27 20:02:13

183 get 2 at 2019-06-27 20:02:13

182 get 1 at 2019-06-27 20:02:13

184 get 3 at 2019-06-27 20:02:13

181 get 4 at 2019-06-27 20:02:14

181 finsh sleep by 0 at 2019-06-27 20:02:14

183 get 5 at 2019-06-27 20:02:14

182 get 6 at 2019-06-27 20:02:14

184 get 7 at 2019-06-27 20:02:14

182 finsh sleep by 1 at 2019-06-27 20:02:14

183 finsh sleep by 2 at 2019-06-27 20:02:14

184 finsh sleep by 3 at 2019-06-27 20:02:14

181 get 8 at 2019-06-27 20:02:15

181 finsh sleep by 4 at 2019-06-27 20:02:15

183 get 9 at 2019-06-27 20:02:15

183 finsh sleep by 5 at 2019-06-27 20:02:15

182 finsh sleep by 6 at 2019-06-27 20:02:15

184 finsh sleep by 7 at 2019-06-27 20:02:15

181 finsh sleep by 8 at 2019-06-27 20:02:16

183 finsh sleep by 9 at 2019-06-27 20:02:16

Traceback (most recent call last):

  File "/usr/lib/python3.6/multiprocessing/pool.py", line 720, in next

    item = self._items.popleft()

IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent call last):

  File "x.py", line 21, in <module>

    res = next(it)

  File "/usr/lib/python3.6/multiprocessing/pool.py", line 723, in next

    raise StopIteration

StopIteration

 

我们看到,每一轮调用都是 next 方法触发的,和我们曾经介绍过的一样,最终迭代器的最后一次 next 调用会抛出 StopIteration 异常

 

正如我们可以给进程发送 SIGINT 与 SIGTERM 两种信号来关闭进程或强制终止进程,进程池也提供了两种终止的方法

 

正常退出 -- close

close()

 

等待进程池中所有已分配任务执行结束后退出

 

强制中止 -- terminate

terminate()

 

立即强制中止所有进程并退出

 

等待退出 -- join

join()

 

进程池同时也提供了 join 方法,用来阻塞等待直到进程池中所有进程均执行结束

 

multiprocessing 中进程池的使用,与我们上一篇文章中讲述的 multiprocessing 子进程创建并执行并发请求从本质上与风格上都是一致的,只是对我们的程序编写来说简化了大量的管理与操作的代码,让我们将目光完全集中于实际任务的编写

使用过 java 的 future 对象的同学一定会觉得 multiprocessing 提供了这么多不同类型的执行方法让人有些难以选择,而隐藏了具体细节的 Future 则显得更加抽象和易用

python 的设计也参考了 java 中的设计,实现了 Futrue 对象,同时统一了进程池与线程池的用法,敬请期待下一篇文章我们的详细介绍

 

欢迎关注微信公众号,以技术为主,涉及历史、人文等多领域的学习与感悟,每周三到七篇推文,全部原创,只有干货没有鸡汤

 

 






技术帖      python      进程      进程池      pool      multiprocessing     


京ICP备15018585号