标准库 multiprocessing 支持使用类似于 threading 的用法来创建管理线程,并且避免了 GIL 问题。
13.4.1 创建线程
from multiprocessing import Process
import os
def f(name):
print('module name:', __name__)
print('parent process:', os.getppid())
print('process id:', os.getpid())
print('hello', name)
if __name__ == '__main__':
p = Process(target = f, args = ('bob',))
p.start()
p.join()
multiprocessing 还提供了 Pool 对象支持数据的并行操作。下面的代码并发计算二维数组每行的平均值。
from multiprocessing import Pool
from statistics import mean
def f(x):
return mean(x)
if __name__ == '__main__':
x = [list(range(10)), list(range(20, 30)), list(range(50, 60)), list(range(80, 90))]
with Pool(5) as p:
print(p.map(f, x))
13.4.2 进程间数据交换
使用 Queue 对象在进程间交换数据。
import multiprocessing as mp
def foo(q):
q.put('hello world!')
if __name__ == '__main__':
mp.set_start_method('spawn')
q = mp.Queue()
p = mp.Process(target = foo, args = (q,))
p.start()
p.join()
print(q.get())
也可以使用上下文对象 context 的 Queue 对象实现进程间的数据交换。
import multiprocessing as mp
def foo(q):
q.put('hello world')
if __name__ == '__main__':
ctx = mp.get_context('spawn')
q = ctx.Queue()
p = ctx.Process(target = foo, args = (q,))
p.start()
p.join()
print(q.get())
使用管道实现数据交换。
from multiprocessing import Process, Pipe
def f(conn):
conn.send('hello world') # 向管道中发送数据
conn.close()
if __name__ == '__main__':
parent_conn, child_conn = Pipe() # 创建管道对象
p = Process(target = f, args = (child_conn,)) # 将管道的一方作为参数传递给子进程
p.start()
print(parent_conn.recv()) # 通过管道的另一方接收数据
p.join()
使用共享内存实现进程间数据交换。
from multiprocessing import Process, Value, Array
def f(n, a):
n.value = 3.1415927
for i in range(len(a)):
a[i] = a[i] * a[i]
if __name__ == '__main__':
num = Value('d', 0.0) # 实型
arr = Array('i', range(10)) # 整数型
p = Process(target = f, args = (num, arr))
p.start()
p.join()
print(num.value)
print(arr[:])
使用Manager 对象实现进程间数据交换。
Manager 对象控制一个拥有 list、dict、Lock、RLock、Semaphore、BoundeSemaphore、Condition、Event、Barrier、Queue、Value、Array、Namespace 等对象的服务端进程,允许其他进程访问这些对象。
from multiprocessing import Process, Manager
def f(d, l, t):
d['name'] = 'dfg'
d['age'] = 38
d['sex'] = 'Male'
d['affiliation'] = 'SDIBT'
l.reverse()
t.value = 3
if __name__ == '__main__':
with Manager() as manager:
d = manager.dict()
l = manager.list(range(10))
t = manager.Value('i', 0)
p = Process(target = f, args = (d, l, t))
p.start()
p.join()
for item in d.items():
print(item)
print(l)
print(t.value)
13.4.3 进程同步
使用 Lock 对象实现进程同步。
from multiprocessing import Process, Lock
def f(l, i):
l.acquire()
try:
print('hello world', i)
finally:
l.release()
if __name__ == '__main__':
lock = Lock()
for num in range(10):
Process(target = f, args = (lock, num)).start()
使用 Event 对象实现进程同步。
from multiprocessing import Process, Event
def f(e, i):
if e.is_set():
e.wait()
print('hello world', i)
e.clear()
else :
e.set()
if __name__ == '__main__':
e = Event()
for num in range(10):
Process(target = f, args = (e, num)).start()