multiprocessing中的Queue
首先我们知道,在各个进程中,每个全局变量与局部变量是不能与其他进程的通讯的,所以说这里可以通过消息队列的方式完成进程间的通讯。
代码见下例子
#coding=utf-8
from multiprocessing import Queue,Process
import time
def write(que):
for value in ['A','B','C','D']:
que.put(value)
print('%s已经被添加至que中'%value)
time.sleep(1)
def read(que):
for i in range(que.qsize()):
print(que.get()+'已被导出')
time.sleep(1)
if __name__ == '__main__':
que = Queue()
p1 = Process(target = write,args = (que,))
p1.start()
p1.join()
p2 = Process(target = read,args = (que,))
p2.start()
p2.join()
print('所有数据导出完毕')
执行结果如下:
当然这是在multiprocessing模块中使用Process的结果。
multiprocessing中的Manager().Queue()#
如果考虑使用进程池Pool的话,我们必须使用multiprocessing模块中的Manager来创建消息队列
代码如下:
#coding=utf-8
from multiprocessing import Pool,Manager
import time,os
def write(que):
print('启动write进程,pid=%s,ppid=%s'%(os.getpid(),os.getppid()))
for i in ['A','B','C','D']:
que.put(i)
print('%s已经添加至que中'%i)
time.sleep(1)
def read(que):
print('启动read进程,pid=%s,ppid=%s'%(os.getpid(),os.getppid()))
for i in range(que.qsize()):
print(que.get()+'已被移除')
time.sleep(1)
if __name__ == '__main__':
po = Pool()
que = Manager().Queue()
po.apply(write,(que,))
po.apply(read,(que,))
po.close()
po.join()
print('主进程%s结束'%os.getpid())
执行结果如下:
Queue模块中的Queue()
此部分介绍在Python同步文章中有介绍,详细看那里