一 python多进程multiprocessing 主要是process和pool两个类, pool类主要是两个方法:pool.apply_async和 pool.apply
1.Process 类
Process 类用来描述一个进程对象。创建子进程的时候,只需要传入一个执行函数和函数的参数即可完成 Process 示例的创建。
star() 方法启动进程,
join() 方法实现进程间的同步,等待所有进程退出。
close() 用来阻止多余的进程涌入进程池 Pool 造成进程阻塞。
multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
target 是函数名字,需要调用的函数
args 函数需要的参数,以 tuple 的形式传入
参考; https://www.jb51.net/article/141825.htm
2. Pool 类
可以提供指定数量的进程供用户使用,默认是 CPU 核数。当有新的请求提交到 Poll 的时候,如果池子没有满,会创建一个进程来执行,否则就会让该请求等待。
- Pool 对象调用 join 方法会等待所有的子进程执行完毕
- 调用 join 方法之前,必须调用 close
- 调用 close 之后就不能继续添加新的 Process 了
二.一些实际bug:
- 多进程的使用的时候 [p.join() for p in process_pool if p.is_alive()] 的位置很关键,如果在for 循环里面则输出还是单次创建一个进程,然后杀死此进程,在创建新的进程,这个时候估计是阻塞的,
如果p.join 和 p.close 在 任务for循环 的同级(也就是外面),则是同时创建指定个数的进程,此时我的程序是参数process_count来控制,如果process_count等于8,那么就会同时创造八个进程打开八个浏览器窗口.
例子1:在for循环里面:阻塞式
def main(url, process_count):
process_pool=[]
for i in range(20000):
# sleep(1)
print(f"{i} is starting")
start_time=time.perf_counter()
while len(process_pool) >= process_count:
# time.sleep(1)
alive_pool=[]
for p in process_pool:
if p.is_alive():
alive_pool.append(p)
else:
p.close()
process_pool=alive_pool
p=multiprocessing.Process(target=add, args=(url,))
p.start()
process_pool.append(p)
[p.join() for p in process_pool if p.is_alive()]
print(f"{i} is close")
end_time=time.perf_counter()
logger.debug(f'handle {i} use time {end_time - start_time}')
例子2:在for循环外面,就会同时创建进程
运行结果:
2.启动多进程时的两个方法的对比:
import multiprocessing
from multiprocessing import Pool
# 多进程一
if __name__ == '__main__':
print('Parent process %s.' % os.getpid())
p=Pool(4)
for i in range(20000):
print(f"{i} is starting")
p.apply_async(add, args=(url,))
sleep(2)
print(f"{i} is close")
print('Waiting for all subprocesses done...')
p.close()
p.join()
print('All subprocesses done.')
# 多进程二
if __name__ == '__main__':
process_count=8
url="https://www.jianshu.com/p/8728110ad2b2"
main(url, process_count)
方法1: p.apply_async(add, args=(url,))
注意:如果传递给apply_async()的函数如果有参数,需要以元组的形式传递 并在最后一个参数后面加上 ,号,如果没有加, 号,提交到进程池的任务也是不会执行的方法2: p=multiprocessing.Process(target=add, args=(url,))
Python 多进程模块multiprocessing的使用
python的多进程主要通过multiprocessing来实现,主要使用到了两个类,一个是Process,一个是Pool,下面来详细说一下这两个类的使用场景。
* Pool
顾名思义这个类实现的是进程池功能,我们可以使用Pool类来高效地创建进程池,代码如下
from multiprocessing import Pool
import time
def test(arg):
print(arg)
time.sleep(3)
process_pool = Pool(4) # 进程的数量最好不要超过机器的CPU核数
for i in range(4):
process_pool.apply_async(test, (i))
process_pool.apply(test, (i))
process_pool.close() # close表示进程池不再接受新的进程
process_pool.join() # join表示等待进程池中的所有进程执行完,要不然主函数会直接执行完并关闭
* Pool的两个坑 (apply 和 apply_async)
在向进程池添加进程的时候有两种方法,一个是apply, 一个是apply_async
• apply函数会同步执行,一个任务执行完成后再执行下一个任务,就和同步代码一样一样的,所以如果你想实现多进程并发操作,不要用apply
• apply_async函数会异步执行,对,就是你想要的异步执行,但是有一个问题千万注意一下:apply_async函数会把你的所有的进程任务先加载到内存中,然后等待执行!这在一般情况下肯可能没有问题,但是有一种情况要小心:如果你的每一个进程需要加载很大的文件到内存,这个操作会把你的内存撑爆导致死机。
from multiprocessing import Process
def test(data):
print("push datat to db")
process_pool = []
process_count = 8
for file in file_list:
with open(file) as f:
data = f.readlines()
tasks = [data[:10000], data[10000:20000], data[20000:30000], data[30000:]]
for task in tasks:
# 如果进程池已满,就等待
while len(process_pool) >= process_count:
time.sleep(1)
# 将已经执行完毕的进程移除进程池
process_pool = [p for p in process_pool if p.is_alive()]
# 如果进程池有空位就将任务添加进去
p = Process(target=test, args=(task,))
p.start()
process_pool.append(p)
# 等待没有执行完成的进程执行完
[p.join() for p in process_pool if p.is_alive()]