本文是我在学习 Python 多进程过程中的一些总结,主要介绍多进程的实现方式以及进程间的通信,大体有如下这么几点知识:
-
fork
创建进程 -
Process
创建进程 - 进程池
- 进程间通信
- 进程池的进程间通信
fork 创建进程
针对于类 UNIX 操作系统,os
模块下有一个 fork
方法,可以创建多进程。使用方法如下:
pid = os.fork()
调用 fork
方法会得到一个返回值,该返回值是一个标志值:如果是主进程,那么该返回值就是主进程的 pid
,如果是创建出来的子进程,该返回值就是0,我们可以通过这个返回值来区分主进程和子进程:
# 导入 fork 方法
from os import fork
# 使用 fork 创建进程
pid = fork()
# 根据 pid 判断是主进程还是子进程
if not pid:
print("我是子进程,pid 是 %s"%pid)
else:
print("我是主进程,pid 是 %s"%pid)
执行该程序:
charley@charley-ubuntu:~/桌面/Py$ python3 mutliprocess01.py
我是主进程,pid 是 9667
charley@charley-ubuntu:~/桌面/Py$ 我是子进程,pid 是 0
通过 pid 标志值群分主进程和子进程,我们同时可以看到,在主进程执行完成后其会终止,并不影响子进程的执行。也就是说:使用 fork
创建的进程,主进程并不会等待子进程执行完成,二者是独立的。
getpid 和 getppid 方法
主进程和子进程都是独立的进程,因此二者都有独立的 pid
,前面我们在使用 fork
创建进程的时候,如果是主进程,那么 fork
函数的返回值就是主进程的 pid
,而如果是子进程的话,fork
函数的返回值并不是其的 pid
,而是 0,以此来和主进程进行区分。
同时,我们可以使用 getpid
来获取进程的 pid
,使用 getppid
获取父进程的 pid
:
# 导入 os 模块
import os
# 创建进程
pid = os.fork()
# 获取进程的 pid
if not pid:
print("子进程的 pid 是 %s,子进程的 ppid 是 %s"%(os.getpid(),os.getppid()))
else:
print("父进程的 pid 是 %s,父进程的 ppid 是 %s"%(os.getpid(),os.getppid()))
运行结果:
charley@charley-ubuntu:~/桌面/Py$ python3 mutliprocess02.py
父进程的 pid 是 10431,父进程的 ppid 是 9576
子进程的 pid 是 10432,子进程的 ppid 是 10431
父进程也是有 ppid
的,这里它的 ppid 就是 bash 的 pid
。
进程是彼此独立的
一旦创建进程,他们就是彼此独立的,因此我们不能让两个进程修改同一个变量:
from os import fork
pid = fork()
# 创建一个列表
testList = [1,2,3]
if not pid:
# 子进程向列表中添加元素
testList.append(4)
print("子进程:",testList)
else:
# 主进程从列表中移除元素
testList.remove(3)
print("主进程:",testList)
运行结果:
charley@charley-ubuntu:~/桌面/Py$ python3 mutliprocess3.py
主进程: [1, 2]
子进程: [1, 2, 3, 4]
可见,两个进程是不能修改同一份数据的,可以这样理解:一旦创建了一个进程,就在原始程序的基础上创建了一份全新的副本,和原始的代码没有了关联,因此他们是无法修改同一份数据的,只可修改其所在的代码副本中的数据。
Process 创建进程
上面的 fork
可以用来在类 UNIX 操作系统中创建进程,而在 Windows 环境下就不能再使用 fork
方法,需要使用其他的方式。针对这种情况,Python 为我们提供了一个 multiprocessing
模块,通过该模块中的 Process
类也可以创建进程,并兼容各个平台。
下面是 Process
类的用法:
subprocess = Process([ target ],[ args ],[ kwargs ])
Process
类在创建进程对象时,可以接收一个函数作为参数,该函数就是我们要在进程中执行的函数,后面的 args
和 kwargs
分别是一个元组和字典,作为目标函数的参数传入。如果不传入目标函数,将会默认执行进程对象中的 run
方法。
# 导入 Process 类
from multiprocessing import Process
from time import sleep
# 定义目标函数
def getNum(num,delay):
for i in range(num):
print(i)
sleep(delay)
# 在入口程序中执行多进程操作
if __name__ == "__main__":
p = Process(target = getNum,args = (5,1))
# 开始执行进程
p.start()
运行结果:
0
1
2
3
4
使用 Process
创建的进程,主进程会等待子进程执行完成吗?我们可以进行验证:
# 导入 Process 类
from multiprocessing import Process
from time import sleep
# 定义目标函数
def getNum(num,delay):
for i in range(num):
print(i)
sleep(delay)
# 在入口程序中执行多进程操作
if __name__ == "__main__":
p = Process(target = getNum,args = (5,1))
# 开始执行进程
p.start()
print("-----我是主进程中的代码-----")
运行结果:
PS C:\Users\Charley\Desktop\py> python .\py.py
-----我是主进程中的代码-----
0
1
2
3
4
PS C:\Users\Charley\Desktop\py>
可见,主进程是会等待子进程执行完成再退出的。
Process 进程对象的常用方法
下面是 Process
进程对象的一些常用属性和方法:
-
pid
属性:获取当前进程的 pid -
name
属性:获取当前进程的 name -
is_alive
方法:判断进程是否在运行 -
join
方法,主进程是否等待子进程结束再执行,默认为等子进程结束后再执行主进程,也可以接受一个参数,表示最多等待多少时间 -
start
方法:启动子进程 -
run
方法:如果创建子进程对象时没有指定target
参数,则会在start
时执行子进程中的run
方法 -
terminate
方法:立即终止子进程,不论任务是否完成
扩展 Process 类
除了使用 Process
类直接创建进程,我们也可以自定义一个继承于 Process
的类来进行创建,提高了扩展性:
# 导入 Process 类
from multiprocessing import Process
from time import sleep
# 定义 Process 类的子类
class CreateSubProcess(Process):
def __init__(self,name,maxRange):
Process.__init__(self)
self.name = name
self.__maxRange = maxRange
def run(self):
for i in range(self.__maxRange):
print("%s 正在输出 %s"%(self.name,i))
sleep(1)
if __name__ == '__main__':
p = CreateSubProcess("子进程01",5)
p.start()
运行结果:
PS C:\Users\Charley\Desktop\py> python .\py.py
子进程01 正在输出 0
子进程01 正在输出 1
子进程01 正在输出 2
子进程01 正在输出 3
子进程01 正在输出 4
PS C:\Users\Charley\Desktop\py>
我们定义了 Process
的子类,实现了在其基础上的扩展,可以把某个独立进程相关的方法都放在子类里面,起到了很好的封装作用。
进程池
除了使用 Process
类创建进程,也可以运用进程池来实现多进程,更加节省资源和方便。进程池方便了资源管理和调度,要使用进程池,需要首先创建一个进程池对象,创建该对象时需要传入一个参数,表示池子中的最大进程数,我们所有的多任务都可以通过进程池中的进程来完成,而不是每个任务都创建一个进程,节省了资源,并且进程池会自动帮我们调度资源。
创建进程池
创建进程池很简单,只需使用 multiprocessing
模块中的 Pool
类:
pool = Pool([ maxValue ])
在创建进程池对象时可以接受一个参数,表示池子中的最大进程数,如果不传入参数,表示无进程数限制。
进程池的几个常用方法
下面是几个进程池中的常用方法:
- apply_async( target,args,kwargs ):接受一个目标函数作为池子中某个进程的执行函数,异步添加
- apply( target,args,kwargs ):同步添加,只有在当前进程执行完成后再添加目标目标函数,会造成进程池中其他进程的阻塞,少用
- close:关闭进程池,关闭后不再接受任务
- join:主进程等待进程池中的进程执行完成后再执行,因为使用进程池创建进程时,主进程默认不会等待子进程执行完成,因此该方法较常用,但只能放在
close
方法后执行。
下面来看一个进程池的例子:
from multiprocessing import Pool
from time import sleep
import os
def getNum(maxRange,delay):
for i in range(maxRange):
print("进程 %s 正在输出 %d"%(os.getpid(),i))
sleep(delay)
if __name__ == '__main__':
# 定义进程池
pool = Pool(3)
# 向进程池中添加任务
for i in range(5):
pool.apply_async(getNum,(3,1))
# 关闭进程池,不在接受任务
pool.close()
pool.join()
运行结果:
PS C:\Users\Charley\Desktop\py> python .\py.py
进程 2584 正在输出 0
进程 12780 正在输出 0
进程 6072 正在输出 0
进程 12780 正在输出 1
进程 2584 正在输出 1
进程 6072 正在输出 1
进程 2584 正在输出 2
进程 12780 正在输出 2
进程 6072 正在输出 2
进程 2584 正在输出 0
进程 12780 正在输出 0
进程 12780 正在输出 1
进程 2584 正在输出 1
进程 2584 正在输出 2
进程 12780 正在输出 2
PS C:\Users\Charley\Desktop\py>
上面我们创建了容量为 3 的进程池,并向其中仍进去了 5 个任务,一开始进程池中进程数是小于任务数的,所以会先执行 3 个任务,当池子中有进程完成任务后,再执行后面的任务,直到所有任务执行完成为止。
进程间通信
前面我们说到过,多个进程间是彼此独立的,无法直接修改同一份数据,但实际情况中往往是有这样的需求的,于是就要使用进程间通信。进程间通信的方式有很多,比如共享内存、socket、网络、Queue 等,我们这里讨论的是使用 Queue 进行进程间通信。
Queue
Queue
也是 multiprocessing
模块中的一个类,意为“队列”,创建一个 Queue
对象:
queue = Queue( [maxVal] )
在创建 Queue
对象时,可以传入一个参数,表示队列中的最大消息数,如果不传,表示无限制。Queue
对象中的常用方法有:
-
qsize
:返回当前队列中包含的消息数量 -
empty
:判断队列是否为空 -
full
:判断队列是否充满 -
get([ block, [timeout]])
:从队列中取出消息,如果当前队列为空,则进入阻塞状态,直到队列中有消息可以被取用为止。如果 block 设置为False
,则在队列为空时抛出Queue.Empty
异常。该方法也可以接受一个超时参数,表示最多等待多长时间,如果超过等待时间仍然没有取出消息,也会抛出Queue.Empty
异常。 -
get_nowait
:相当于get(False)
-
put(item, [block])
:向队列中放入消息,当队列充满时,会进入等待状态,直到队列中可以放入消息为止。如果 block 设置为False
,则在队列充满时抛出Queue.Full
异常。 -
put_nowait
:相当于put(item,False)
使用 Queue 进行进程间通信
我们可以利用 Queue
进行进程间通信,只需将 Queue
对象传入相应的任务函数中:
from multiprocessing import Queue,Process
from time import sleep
import os
def getVal(queue):
while True:
if queue.empty():
print("进程 %s 已经取完了~"%os.getpid())
break
else:
print("进程 %s 获取到了数据 %d"%(os.getpid(),queue.get()))
sleep(1)
def putVal(queue,maxRange):
for i in range(maxRange):
queue.put(i)
print("进程 %s 放入了数据 %d"%(os.getpid(),i))
sleep(1)
# 创建队列
queue = Queue()
# 创建两个进程
gv = Process(target = getVal,args = (queue,))
pv = Process(target = putVal,args = (queue,10))
# 启动进程
if __name__ == "__main__":
pv.start()
gv.start()
运行结果:
PS C:\Users\Charley\Desktop\py> python .\py.py
进程 22208 放入了数据 0
进程 20656 获取到了数据 0
进程 22208 放入了数据 1
进程 20656 获取到了数据 1
进程 22208 放入了数据 2
进程 20656 获取到了数据 2
进程 22208 放入了数据 3
进程 20656 获取到了数据 3
进程 22208 放入了数据 4
进程 20656 获取到了数据 4
进程 22208 放入了数据 5
进程 20656 获取到了数据 5
进程 22208 放入了数据 6
进程 20656 获取到了数据 6
进程 22208 放入了数据 7
进程 20656 获取到了数据 7
进程 22208 放入了数据 8
进程 20656 获取到了数据 8
进程 22208 放入了数据 9
进程 20656 获取到了数据 9
进程 20656 已经取完了~
PS C:\Users\Charley\Desktop\py>
进程池中的通信
上面的进程间通信是基于 Process
类(或者其子类)创建出的进程,可以直接使用 multiprocessing
中的 Queue
队列,但对于进程池,需要使用 multiprocessing
中的 Manager
类,除此之外,其他操作并没有变化:
# 导入 Manager 和 Pool 类
from multiprocessing import Manager,Pool
from time import sleep
import os
def getVal(queue):
while True:
if queue.empty():
print("进程 %s 已经取完了~"%os.getpid())
break
else:
print("进程 %s 获取到了数据 %d"%(os.getpid(),queue.get()))
sleep(1)
def putVal(queue,maxRange):
for i in range(maxRange):
queue.put(i)
print("进程 %s 放入了数据 %d"%(os.getpid(),i))
sleep(1)
# 入口方法
def main():
# 创建队列
queue = Manager().Queue()
# 创建进程池
pool = Pool(2)
pool.apply_async(putVal,(queue,5))
pool.apply_async(getVal,(queue,))
# 关闭进程池并使主进程等待
pool.close()
pool.join()
if __name__ == '__main__':
main()
运行结果如下:
PS C:\Users\Charley\Desktop\py> python .\py.py
进程 15304 放入了数据 0
进程 17264 获取到了数据 0
进程 15304 放入了数据 1
进程 17264 获取到了数据 1
进程 15304 放入了数据 2
进程 17264 获取到了数据 2
进程 15304 放入了数据 3
进程 17264 获取到了数据 3
进程 15304 放入了数据 4
进程 17264 获取到了数据 4
进程 17264 已经取完了~
PS C:\Users\Charley\Desktop\py>
针对于进程池中的通信,只需改变创建队列对象的方式即可,其他的操作都不变。
总结
本文主要介绍了 Python 中的多进程,下面是一些要点:
- 使用
fork
在类 UNIX 操作系统创建进程 - 使用兼容各平台的
Process
- 扩展
Process
类,封装进程模块 - 使用
Pool
创建进程池 - 几种创建方式下,主进程是否会等待子进程执行完成
- 进程对象的几个常用方法
- 进程池对象的常用方法
- 队列的常用方法
- 进程间通信
- 使用进程池时如何通信
完。