这篇文章简单介绍一下multiprocessing包中的进程池程管理工具Pool。如果你是第一次接触python的多进程,请先看一下我的前两篇文章https://www.jianshu.com/p/31bca20caec0和https://www.jianshu.com/p/e7a5f3b2afcf。如果只想了解Pool,那就接着看吧。在上一篇文章中我们介绍了Process的主要应用,使用Process可以建立一个进程,如果建立多个进程可以使用多个Process建立不同的的进程对象,也可以使用for循环建立多个进程,具体内容请看我在简书的上一篇文章。
我们引入进程池工具Pool,直接把进程扔到池里,根据池自身的特点对进程进行管理,后面我会详细介绍,这里只需要知道他是一个容器能够够使制定数量的进程在池中统一运行即可,下面来看一段简单的代码:
# 多个进程同时运行没有返回值
def f(name):
"""定义一个简单的打印名字的函数"""
print('hello:', name)
if __name__ == '__main__':
name_list = ["小飞", "小倩", "是不是淘气", "二妞"]
pool = Pool(processes=4) # 创建4个进程
for name in name_list:
pool.apply_async(f, (name,))
pool.close() # 关闭进程池,表示不能在往进程池中添加进程
pool.join() # 等待进程池中的所有进程执行完毕,必须在close()之后调用
返回的结果为:
hello: 小飞
hello: 小倩
hello: 是不是淘气
hello: 二妞
在上面的例子中我们使用Pool建立了一个可以存放四个进程的池,使用for循环,将进程依次加入到进程池中,对于每一个进程使用apply_async()方法建立进程。对于进程池Pool这个类,主要有两种建立进程的方式一种是apply_async(),另一种是map(),这两种类方法的解释如下:
apply_async函数原型:
apply_async(func[, args=()[, kwds={}[, callback=None]]])
与apply用法一样,但它是非阻塞且支持结果返回进行回调。
map 函数原型:
map(func, iterable[, chunksize=None])
Pool类中的map方法,与内置的map函数用法行为基本一致,它会使进程阻塞直到返回结果。
注意,虽然第二个参数是一个迭代器,但在实际使用中,必须在整个队列都就绪后,程序才会运行子进程。
之前对close、terminate、join方法已经有了介绍,这里在介绍一次:
close()
关闭进程池(pool),使其不在接受新的任务。
terminate()
结束工作进程,不在处理未处理的任务。
join()
主进程阻塞等待子进程的退出,join方法必须在close或terminate之后使用。
上面的案例中,每个进程都没有返回值,下面介绍一个有返回值的,先看一下代码,我再来解释:
# 多个进程同时运行,每个进程都有返回值
def add_part(part):
"""计算一个列表的开始到末尾的累加
:param part:长度为二的列表
:return: 返回累加的和
"""
result = 0
for value in range(part[0], part[1] + 1):
result += value
return result
if __name__ == '__main__':
# 第一种方式使用pool.apply_async(),使用pool.get()来获取结果
startTime = datetime.datetime.now()
part_result = []
pool = Pool(processes=2)
for i in [[0, 500000000], [500000001, 1000000000]]:
part_result.append(pool.apply_async(add_part, (i,)))
pool.close()
pool.join()
add_result = 0
for index, value in enumerate(part_result, 0):
print("第%s个进程的结果为%s" % (index, value.get()))
add_result += value.get()
print("结果为", add_result)
print(datetime.datetime.now() - startTime)
这里我们继续使用我们的老例子,将一个累加的过程分成两个进程,这里我就不对案例的过程详细介绍了,如果没看懂的先出门看一下我前面的两篇文章:https://www.jianshu.com/p/31bca20caec0和https://www.jianshu.com/p/e7a5f3b2afcf。这里我主要介绍一下和之前不同的地方,这里是有返回值的,我们在上面的程序中使用 part_result = []来存储我们建立的进程,后面我们在使用for循环遍历这个存储进程的列表,使用get()方法得到进程的结果并将其累加。我们来看一些结果:
第0个进程的结果为125000000250000000
第1个进程的结果为375000000250000000
结果为 500000000500000000
0:00:34.825516
上面的例子中我们使用了Pool来对有返回的进程距离一个例子,但是这里使用了get()来收集进程的返回值,下面我们来使用map()方法来直接收集进程返回的结果,上代码:
def add_part(part):
"""计算一个列表的开始到末尾的累加
:param part:长度为二的列表
:return: 返回累加的和
"""
result = 0
for value in range(part[0], part[1] + 1):
result += value
return result
if __name__ == '__main__':
# 第二种方式使用pool.map(),直接获取所有进程的结果获取结果
startTime = datetime.datetime.now()
pool = Pool(processes=2)
part_result = (pool.map(add_part, [[0, 500000000], [500000001, 1000000000]]))
pool.close()
pool.join()
print(part_result)
add_result = 0
for i in part_result:
print(i)
add_result += i
print("结果为:", add_result)
print(datetime.datetime.now() - startTime)
运行结果为:
[125000000250000000, 375000000250000000]
125000000250000000
375000000250000000
结果为: 500000000500000000
0:00:32.576957
我们来看一下上面的代码,我们使用了map()方法直接将进程的结果输出到part_result 列表中。下一篇文章中我们继续介绍多个线程之间的数据交互。