通过 multiprocessing 实现 python 多进程

2019-05-22 14:18:41   最后更新: 2019-05-22 14:20:06   访问数量:136




此前一系列文章中,我们介绍了 Python 的threading 包中的一系列工具

python 的线程

Python 线程同步(一) -- 竞争条件与线程锁

python 线程同步(二) -- 条件对象

python 线程同步(三) -- 信号量

python 线程同步(四) -- 事件对象与栅栏

 

threading 包为 Python 提供了线程模型,而 multiprocessing 包则为另一种并发模型 -- 多进程模型提供了强大的解决方案

multiprocessing 与 threading 十分相似,他提供了基本的进程对象类以及功能强大的进程同步工具,同时,multiprocessing 还提供了进程池的封装类 Pool

 

 

 

此前我们介绍了 Python 中的 GIL 锁,受此影响,Python 每一个时刻只能调度一个线程,这意味着并发并没有真的在进行

而多进程则不同,多进程并发的模式中,由于进程间严格的隔离,他们得以真正的并行执行

同时,Python 多进程让多核 CPU 得以被利用

 

但相比多线程机制,多进程的模式也存在一些缺点和不足:

  1. 进程切换更为耗时
  2. 进程间通信相比线程间共享的数据更为复杂

 

multiprocessing 提供的方法
方法描述
active_children返回当前进程存活的子进程的列表
cpu_count返回系统的 CPU 数量,但并不是当前进程可用的数量,len(os.sched_getaffinity(0)) 方法获取的是当前进程可用的数量
current_process获取当前进程的 Process 对象
get_all_start_methods返回支持的启动方法的列表,该列表的首项即为默认选项,包括我们后面即将要介绍的 'fork', 'spawn' 和 ’forkserver’
get_context返回进程上下文 Context 对象
get_start_method获取当前启动进程的启动方法,'fork' , 'spawn' , 'forkserver' 或者 None(如果没有设置)
set_executable设置在启动子进程时使用的 Python 解释器路径,例如:set_executable(os.path.join(sys.exec_prefix, 'pythonw.exe'))
set_start_method设置启动子进程的方法 'fork' , 'spawn' 或者 'forkserver'

 

你会发现 Process 类与 Thread 类十分相似,他们都通过 start 方法启动并开始执行 run 方法的内容,同时,join 用来阻塞等待某个进程完成执行

但是不同的是,这些方法只能由被调用进程的父进程来调用

 

Process 类成员

类成员属性

  • name -- 进程名
  • daemon -- 布尔值,是否是守护进程
  • pid -- 进程 id
  • exitcode -- 进程退出时的退出码,如果被信号终止,则返回信号值的相反数,进程未退出前该值为 None
  • authkey -- 进程身份秘钥,字节字符串,当 multiprocessing 初始化时,主进程使用 os.urandom() 分配一个随机字符串,创建 Process 对象时,子进程继承父进程的身份秘钥
  • sentinel -- 系统对象的数字句柄,在 UNIX 上是一个文件描述符

 

类成员方法

  • run -- 进程的具体活动
  • start -- 启动进程活动
  • join -- 等待进程执行结束或超时
  • is_alive -- 判断进程是否存活
  • terminate -- 终止进程,在 UNIX 环境中,通过给进程发送 SIGTERM 信号实现,在 Windows 环境中,通过 TerminateProcess 方法实现,被终止进程的子进程将不会被一起终止

 

需要注意的是,正如我们上面所说,start() 、 join() 、 is_alive() 、 terminate() 和 exitcode 方法只能由创建进程对象的进程调用

 

示例 -- 通过 Process 类创建进程

通过继承 Process 类实现子进程创建

import logging from multiprocessing import Process from time import sleep, ctime class myProcess(Process): def __init__(self, nsec): super().__init__() self.nsec = nsec def run(self): logging.info('start_sleep[%s]' % self.nsec) sleep(self.nsec + 1) logging.info('end_sleep[%s]' % self.nsec) if __name__ == '__main__': logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s: %(message)s') logging.info('start at %s'% ctime()) processes = list() for i in range(5): t = myProcess(i) processes.append(t) for process in processes: process.start() for process in processes: process.join() logging.info('end at %s' % ctime())

 

 

打印出了:

2019-05-22 10:02:24,705 - INFO: start at Wed May 22 10:02:24 2019

2019-05-22 10:02:24,739 - INFO: start_sleep[2]

2019-05-22 10:02:24,740 - INFO: start_sleep[1]

2019-05-22 10:02:24,742 - INFO: start_sleep[4]

2019-05-22 10:02:24,743 - INFO: start_sleep[0]

2019-05-22 10:02:24,743 - INFO: start_sleep[3]

2019-05-22 10:02:25,745 - INFO: end_sleep[0]

2019-05-22 10:02:26,746 - INFO: end_sleep[1]

2019-05-22 10:02:27,747 - INFO: end_sleep[2]

2019-05-22 10:02:28,748 - INFO: end_sleep[3]

2019-05-22 10:02:29,749 - INFO: end_sleep[4]

2019-05-22 10:02:29,750 - INFO: end at Wed May 22 10:02:29 2019

 

通过创建 Process 对象实现子进程创建

import logging from multiprocessing import Process from time import sleep, ctime def sleep_func(pindex): logging.info('start_sleep[%s]' % pindex) sleep(pindex + 1) logging.info('end_sleep[%s]' % pindex) if __name__ == '__main__': logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s: %(message)s') logging.info('start at %s' % ctime()) processes = list() for i in range(5): process = Process(target=sleep_func, args=[i]) processes.append(process) for process in processes: process.start() for process in processes: process.join() logging.info('end at %s' % ctime())

 

 

打印出了:

2019-05-22 10:07:30,340 - INFO: start at Wed May 22 10:07:30 2019

2019-05-22 10:07:30,346 - INFO: start_sleep[2]

2019-05-22 10:07:30,348 - INFO: start_sleep[3]

2019-05-22 10:07:30,350 - INFO: start_sleep[0]

2019-05-22 10:07:30,351 - INFO: start_sleep[4]

2019-05-22 10:07:30,349 - INFO: start_sleep[1]

2019-05-22 10:07:31,353 - INFO: end_sleep[0]

2019-05-22 10:07:32,353 - INFO: end_sleep[1]

2019-05-22 10:07:33,354 - INFO: end_sleep[2]

2019-05-22 10:07:34,353 - INFO: end_sleep[3]

2019-05-22 10:07:35,353 - INFO: end_sleep[4]

2019-05-22 10:07:35,354 - INFO: end at Wed May 22 10:07:35 2019

 

可以看到,multiprocessing 的用法与 threading 中的用法简直是一模一样

 

根据不同的平台,multiprocessing 有三种启动进程的方法:

  1. spawn -- 父进程启动一个新的Python解释器进程。子进程只会继承那些运行进程对象的 run() 方法所需的资源,父进程中非必须的文件描述符和句柄则不会被继承,与另两种方法相比,这个方法启动进程非常慢,是 windows 上的默认设置,也可用在 Unix 中
  2. fork -- 通过 os.fork() 方法创建子进程,子进程在开始时与父进程完全相同,会继承父进程中的所有资源,只能用于 Unix,是 Unix 系统中的默认方式
  3. forkserver -- 启动服务器进程,并从此刻开始,每当需要一个新进程时,父进程就会连接到服务器并请求它分叉一个新进程,从而避免父进程中的资源被继承,只能用于 Unix 环境中

 

通过 multiprocessing.set_start_method 方法,可以设置不同的启动方法:

import multiprocessing as mp def foo(q): q.put('hello') if __name__ == '__main__': mp.set_start_method('spawn') q = mp.Queue() p = mp.Process(target=foo, args=(q,)) p.start() print(q.get()) p.join()

 

 

注意

需要注意的是,在程序中 set_start_method() 不应该被多次调用,不同上下文启动的进程可能是不兼容的,比如使用 fork 上下文创建的锁不能传递给使用 spawn 或 forkserver 启动方法启动的进程

 

下面我们来对比一下多进程、多线程运行 CPU 密集型任务的耗时情况:

import time from multiprocessing import Process from threading import Thread def count(x, y): # 使程序完成50万计算 c = 0 while c < 500000: c += 1 x += x y += y if __name__ == '__main__': t = time.time() for x in range(10): count(1, 1) print("Line", time.time() - t) counts = [] t = time.time() for x in range(10): thread = Thread(target=count, args=(1, 1)) counts.append(thread) thread.start() for thread in counts: thread.join() print("Threading", time.time() - t) counts = [] t = time.time() for x in range(10): process = Process(target=count, args=(1, 1)) counts.append(process) process.start() for process in counts: process.join() print("Multiprocess", time.time() - t)

 

 

程序非常简单,我们分别进行 50 万次计算,得到结果如下:

Line 69.52206325531006

Threading 55.799378633499146

Multiprocess 44.240989685058594

 

多进程运行确实有着性能优势,但也没有我们想象中那么大

 

欢迎关注微信公众号,以技术为主,涉及历史、人文等多领域的学习与感悟,每周三到七篇推文,全部原创,只有干货没有鸡汤

 

 






python      多进程      multiprocessing     


京ICP备15018585号