我们在多线程 (Threading) 里提到过, 它是有劣势的, GIL 让它没能更有效率的处理一些分摊的任务。而现在的电脑大部分配备了多核处理器, 多进程Multiprocessing
能让电脑更有效率的分配任务给每一个处理器, 这种做法解决了多线程的弊端,也能很好的提升效率。
1、添加进程 process
其实 python 中,进程和线程的运用很相似:
import multiprocessing as mp
import threading as td
def job(a, d):
print('aaaaa')
if __name__ == '__main__':
p1 = mp.Process(target = job, args = (1, 2))
p1.start()
p1.join()
2、存储进程输出 Queue
import multiprocessing as mp
def job(q):
res = 0
for i in range(1000):
res += i + i**2 + i**3
q.put(res) #queue
if __name__=='__main__':
q = mp.Queue()
#定义两个线程函数,用来处理同一个任务,
# args 的参数只要一个值的时候,参数后面需要加一个逗号,表示args是可迭代的,后面可能还有别的参数,不加逗号会出错
p1 = mp.Process(target=job,args=(q,))
p2 = mp.Process(target=job,args=(q,))
p1.start()
p2.start()
p1.join()
p2.join()
res1 = q.get()
res2 = q.get()
print(res1)
print(res2)
3、效率对比
import multiprocessing as mp
import threading as td
import time
def job(q):
res = 0
for i in range(1000000):
res += i+i**2+i**3
q.put(res) # queue
def multicore(): # 多进程是多核运算
q = mp.Queue()
p1 = mp.Process(target=job, args=(q,))
p2 = mp.Process(target=job, args=(q,))
p1.start()
p2.start()
p1.join()
p2.join()
res1 = q.get()
res2 = q.get()
print('multicore:' , res1+res2)
def multithread():
q = mp.Queue() # thread 可放入 process 同样的 queue 中
t1 = td.Thread(target=job, args=(q,))
t2 = td.Thread(target=job, args=(q,))
t1.start()
t2.start()
t1.join()
t2.join()
res1 = q.get()
res2 = q.get()
print('multithread:', res1+res2)
def normal():
res = 0
for _ in range(2):
for i in range(1000000):
res += i+i**2+i**3
print('normal:', res)
if __name__ == '__main__':
st = time.time()
normal()
st1= time.time()
print('normal time:', st1 - st)
multithread()
st2 = time.time()
print('multithread time:', st2 - st1)
multicore()
print('multicore time:', time.time()-st2)
运行结果:
normal: 499999666667166666000000
normal time: 1.2876827716827393
multithread: 499999666667166666000000
multithread time: 1.5620112419128418
multicore: 499999666667166666000000
multicore time: 0.6951398849487305
可以发现多核/多进程最快,说明在同时间运行了多个任务。 而多线程的运行时间居然比什么都不做的程序还要慢一点,说明多线程还是有一定的短板的。
4、进程池 pool
进程池就是我们将所要运行的东西,放到池子里,Python会自行解决多进程的问题。
import multiprocessing as mp
def job(x):
return x*x
进程池 Pool() 和 map()
Pool
和之前的Process
的不同点是丢向Pool
的函数有返回值,而Process
的没有返回值。(用Queue解决问题)
def multicore():
pool = mp.Pool()
res = pool.map(job, range(10))
print(res)
if __name__ == '__main__':
multicore()
自定义核数量
我们怎么知道 Pool 是否真的调用了多个核呢?我们可以把迭代次数增大些,然后打开 CPU 负载看下 CPU 运行情况:
打开CPU负载(Mac):活动监视器 > CPU > CPU负载(单击一下即可)
Pool默认大小是CPU的核数,我们也可以通过在Pool中传入processes参数即可自定义需要的核数量,
def multicore():
pool = mp.Pool(processes=3) # 定义CPU核数量为3
res = pool.map(job, range(10))
print(res)
apply_async()
Pool 除了 map()外,还有可以返回结果的方式,那就是 apply_async().
apply_async()中只能传递一个值,它只会放入一个核进行运算,但是传入值时要注意是可迭代的,所以在传入值后需要加逗号, 同时需要用get()方法获取返回值。
def multicore():
pool = mp.Pool()
res = pool.map(job, range(10))
print(res)
res = pool.apply_async(job, (2,))
# 用get获得结果
print(res.get())
运行结果:
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81] # map()
4 # apply_async()
用 apply_async() 输出多个结果
#可以将apply_async() 放入迭代器中,定义一个新的multi_res:
multi_res = [pool.apply_async(job, (i,)) for i in range(10)]
#同样在取出值时需要一个一个取出来:
print([res.get() for res in multi_res])
总结
-
Pool
默认调用是CPU的核数,传入processes
参数可自定义CPU核数 -
map()
放入迭代参数,返回多个结果 -
apply_async()
只能放入一组参数,并返回一个结果,如果想得到map()
的效果需要通过迭代
5、共享内存 share memory
只有用共享内存才能让 CPU 之间有交流。学习资料:Python C type code种类。
Shared Value
我们可以通过使用Value
数据存储在一个共享的内存表中。
import multiprocessing as mp
value1 = mp.Value('i', 0)
value2 = mp.Value('d', 3.14)
其中d
和i
参数用来设置数据类型的,d
表示一个双精浮点类型,i
表示一个带符号的整型。更多的形式请查看本页最后的表.
Shared Array
在Python的mutiprocessing
中,还有一个Array
类,可以和共享内存交互,来实现在进程之间共享数据。
array = mp.Array('i', [1, 2, 3, 4])
这里的Array
和numpy中的不同,它只能是一维的,不能是多维的。同样和Value
一样,需要定义数据形式,否则会报错。
参考数据形式
各参数代表的数据类型
| Type code | C Type | Python Type | Minimum size in bytes |
| --------- | ------------------ | ----------------- | --------------------- |
| `'b'` | signed char | int | 1 |
| `'B'` | unsigned char | int | 1 |
| `'u'` | Py_UNICODE | Unicode character | 2 |
| `'h'` | signed short | int | 2 |
| `'H'` | unsigned short | int | 2 |
| `'i'` | signed int | int | 2 |
| `'I'` | unsigned int | int | 2 |
| `'l'` | signed long | int | 4 |
| `'L'` | unsigned long | int | 4 |
| `'q'` | signed long long | int | 8 |
| `'Q'` | unsigned long long | int | 8 |
| `'f'` | float | float | 4 |
| `'d'` | double | float | 8 |
6、进程锁 Lock
def job(v, num, l):
l.acquire() # 锁住
for _ in range(5):
time.sleep(0.1)
v.value += num # 获取共享内存
print(v.value)
l.release() # 释放
def multicore():
l = mp.Lock() # 定义一个进程锁
v = mp.Value('i', 0) # 定义共享内存
p1 = mp.Process(target=job, args=(v,1,l)) # 需要将lock传入
p2 = mp.Process(target=job, args=(v,3,l))
p1.start()
p2.start()
p1.join()
p2.join()
if __name__ == '__main__':
multicore()