摘要:Python
,多进程
进程是操作系统中具体的处理任务,米一个进程都会有自己独立的内存空间
,它是线程的载体。当一个程序启动时,会默认启动一个进程,将该进程装载到内存,同时在该进程中还会默认启动一个线程,来执行本进程中的内容。
创建多进程程序
在Python中进程的相关实现被封装在模块multiprocessing
中,进程的创建方法和线程一样有两种
- 直接通过multiprocessing中的进程类Process创建
- 继承multiprocessing的Process类,重写run方法
类似于线程创建,对Process类实例化时直接将进程的处理函数传入即可,通过target
来接收处理函数,创建好进程后通过实例化对象的start方法将其启动
(1)使用类实例化方法创建进程
定义进程处理函数 run_proc 。在实例化 Process 时 ,将 run_proc 传入 。接着使用实例化对象
start 方法启动进程,然后调用join代表该子进程结束之后主进程才能结束
import time
import os
from multiprocessing import Process
def run_proc(sid):
print("{} start, pid:{}".format(sid, os.getpid()))
time.sleep(3)
print("{} end, pid:{}".format(sid, os.getpid()))
if __name__ == '__main__':
p1 = Process(target=run_proc, args=("a", ))
p2 = Process(target=run_proc, args=("b", ))
print("main process: {}".format(os.getpid()))
p1.start()
p2.start()
p1.join()
p2.join()
print("main process end...")
输出如下,课件主进程和子进程的进程号pid都不一样,在调用join之后主进程阻塞直到所有子进程结束
main process: 12611
a start, pid:12612
b start, pid:12613
a end, pid:12612
b end, pid:12613
main process end...
(2)使用继承类的方式创建进程
实现自定义进程类使其继承于系统进程类 Process 。重写类中的 run 方法,
import time
import os
from multiprocessing import Process
def run_proc(sid):
print("{} start, pid:{}".format(sid, os.getpid()))
time.sleep(3)
print("{} end, pid:{}".format(sid, os.getpid()))
class MyProcess(Process):
def __init__(self, sid):
super().__init__()
self.sid = sid
def run(self):
run_proc(self.sid)
if __name__ == '__main__':
p1 = Process(target=run_proc, args=("a",))
p2 = Process(target=run_proc, args=("b",))
print("main process: {}".format(os.getpid()))
p1.start()
p2.start()
p1.join()
p2.join()
print("main process end...")
输出如下,和直接实例化Process类输出效果一致
main process: 14859
a start, pid:14860
b start, pid:14861
a end, pid:14860
b end, pid:14861
main process end...
创建进程池
当项目达到一定的规模,频繁创建/销毁进程或者线程是非常消耗资源的,这个时候就要编写自己的线程池/进程池,在Python只有两个模块实现进程池,分别是concurrent.futures
和multiprocessing.Pool
(1)multiprocessing.Pool
传递给Pool一个参数设置进程池内的最大进程数,Pool有多个方法,主要是apply
,apply_async
,map
,map_async
,starmap
,starmap_async
,区别如下
-
apply
:单次同步执行,每次执行传入一个执行函数的参数,并且执行完毕才能执行下一个进程,如果执行函数有返回值返回最后一个执行完进程的值 -
apply_async
:单次启动一个任务,传入一个执行函数的参数,异步执行,启动后不等这个进程结束又开始执行新任务,如果执行函数有返回值返回最后一个执行完进程的对象,需要调用get获得结果值 -
map
:执行一个任务列表,传入执行函数的参数一个参数列表,同步执行,即主进程阻塞直到任务列表中的任务全部执行完毕,如果执行函数有返回值,返回一个结果列表,顺序和输入列表的元素顺序一致 -
map_async
:执行一个任务列表,不需要等待任务列表中的任务执行完毕,主进程可以继续往下执行,如果执行函数有返回值,返回一个结果对象,需要调用get方法得到输出值,顺序和输入列表的元素顺序一致 -
starmap
:和map执行类似,区别是map对执行函数只能传入一个参数,而startmap可以传入多个参数,如果执行函数有返回值,返回一个结果列表,顺序和输入列表的元素顺序一致 -
starmap_async
:和map_async执行类似,区别是map_async对执行函数只能传入一个参数,而starmap_async可以传入多个参数,如果执行函数有返回值,返回一个结果对象,需要调用get方法得到输出值,顺序和输入列表的元素顺序一致
除此之外,在执行完毕多进程任务后,调用close
关闭Process对象,释放与之关联的所有资源,调用join
阻塞主进程,直到调用join的子进程执行完毕再退出
下面用代码测试说明
- apply
import time
import datetime
import os
from multiprocessing import Pool
def run_proc(sid):
time.sleep(3)
print("{} end, pid:{}, time:{}".format(sid, os.getpid(), datetime.datetime.today().strftime("%Y-%m-%d %H:%M:%S")))
return str(sid)
if __name__ == '__main__':
pool = Pool(4)
jobs = ["a", "b", "c", "d"]
for i in jobs:
pool.apply(run_proc, i)
pool.close()
pool.join()
输出如下,可见apply每次只输入一个执行函数的参数,即每次执行一次,多个参数组成的任务列表只能一个接着一个执行
a end, pid:25260, time:2021-05-16 18:15:08
b end, pid:25261, time:2021-05-16 18:15:11
c end, pid:25262, time:2021-05-16 18:15:14
d end, pid:25263, time:2021-05-16 18:15:17
2.apply_async
直接修改Pool的apply方法改为apply_assync
if __name__ == '__main__':
pool = Pool(4)
jobs = ["a", "b", "c", "d"]
for i in jobs:
pool.apply_async(run_proc, i)
print("main process")
pool.close()
pool.join()
输出如下,可见达到了并行效果,进程之间执行任务互补阻塞,主进程也没有被阻塞,main process被提前打印了出来
main process
a end, pid:25721, time:2021-05-16 18:17:33
b end, pid:25722, time:2021-05-16 18:17:33
c end, pid:25723, time:2021-05-16 18:17:33
d end, pid:25724, time:2021-05-16 18:17:33
3.map
修改Pool的方法为map
if __name__ == '__main__':
pool = Pool(4)
jobs = ["a", "b", "c", "d"]
pool.map(run_proc, jobs)
print("main process")
pool.close()
pool.join()
输出如下,可见map达到了并行的效果,但是主进程被阻塞,即map执行完任务列表后,主进程才执行
a end, pid:26939, time:2021-05-16 18:21:43d end, pid:26942, time:2021-05-16 18:21:43
b end, pid:26940, time:2021-05-16 18:21:43
c end, pid:26941, time:2021-05-16 18:21:43
main process
4.map_async
if __name__ == '__main__':
pool = Pool(4)
jobs = ["a", "b", "c", "d"]
res = pool.map_async(run_proc, jobs)
print("main process...")
pool.close()
pool.join()
print(res.get())
输出如下,可见效果了map类似,区别是主进程没有被阻塞,main process被提前打印了出来
main process...
c end, pid:21401, time:2021-05-16 21:41:54
b end, pid:21400, time:2021-05-16 21:41:54a end, pid:21399, time:2021-05-16 21:41:54
d end, pid:21402, time:2021-05-16 21:41:54
['a', 'b', 'c', 'd']
5.starmap
将执行函数的入参改为两个,Pool函数改为starmap,并且传入的任务列表的每个参数也改为两个
def run_proc(sid, other):
time.sleep(3)
print("{} end, other:{}, pid:{}, time:{}".format(sid, other, os.getpid(), datetime.datetime.today().strftime("%Y-%m-%d %H:%M:%S")))
return str(sid) + str(other)
if __name__ == '__main__':
pool = Pool(4)
jobs = [["a", "b"], ["b", "r"], ["c", "d"], ["d", "a"]]
pool.starmap(run_proc, jobs)
print("main process")
pool.close()
pool.join()
输入如下,执行函数正确的接收了两个参数,主进程阻塞
b end, other:r, pid:27642, time:2021-05-16 18:24:52
a end, other:b, pid:27641, time:2021-05-16 18:24:52
c end, other:d, pid:27643, time:2021-05-16 18:24:52
d end, other:a, pid:27644, time:2021-05-16 18:24:52
main process
看一个map是否支持两个参数
if __name__ == '__main__':
pool = Pool(4)
jobs = [["a", "b"], ["b", "r"], ["c", "d"], ["d", "a"]]
pool.map(run_proc, jobs)
print("main process")
pool.close()
pool.join()
直接报错,执行函数缺少参数
TypeError: run_proc() missing 1 required positional argument: 'other'
6.starmap_async
if __name__ == '__main__':
pool = Pool(4)
jobs = [["a", "b"], ["b", "r"], ["c", "d"], ["d", "a"]]
res = pool.starmap_async(run_proc, jobs)
print("main process")
pool.close()
pool.join()
print(type(res.get()))
输出如下,可见和starmap相比,主进程没有被阻塞,其他一样
main process
a end, other:b, pid:17123, time:2021-05-16 21:28:30
b end, other:r, pid:17124, time:2021-05-16 21:28:30
d end, other:a, pid:17126, time:2021-05-16 21:28:30
c end, other:d, pid:17125, time:2021-05-16 21:28:30
<class 'list'>
多线程和多进程的区别
多进程和多线程都可以用并行机制来提升系统的运行效率,二者的区别在于运行时所占的内存分布不同
- 多线程是共用一套内存的代码块区间
- 多进程是各用一套独立的内存区间
在大型计算机集群系统中,都会将多进程程序分布运行在不同的计算机上协同工作,而每一台机器上的进程内部,又会由多个线程来并行工作