Python 3的多进程
多进程库名叫multiprocessing。有几点记录一下:
-
multiprocessing
提供了本地和远端的并发,同时使用子进程机制避免了GIL,它可以在Unix和Windows运行。 -
multiprocessing
提供了一个叫Pool
的对象,它通过不同的进程给执行函数体赋予不同的输入值,以此实现数据并发(data parallelism)。
Pool 对象
from multiprocessing import Pool
def f(x):
return x*x
if __name__ == '__main__':
with Pool(5) as p:
print(p.map(f, [1, 2, 3]))
这里要注意,一定要有if __name__ == '__main__'
参考Programming guidelines。
这个Pool对象或者Pool类看起来非常有用,熟悉之后用它写Python非常方便。它返回的是一个进程集合,隐藏了进程管理的操作。是批量处理进程的不二选择。你可以直接使用这些worker进程,比如说下面这个例子里,首先新建了4个worker进程,然后让它们完成平方计算,但是代码不需要关注哪个进程计算那个数字。当然你也可以手动新建4个进程,效果是一样的。
from multiprocessing import Pool, TimeoutError
import time
import os
def f(x):
return x*x
if __name__ == '__main__':
# start 4 worker processes
with Pool(processes=4) as pool:
# print "[0, 1, 4,..., 81]"
print(pool.map(f, range(10)))
# print same numbers in arbitrary order
for i in pool.imap_unordered(f, range(10)):
print(i)
# evaluate "f(20)" asynchronously
res = pool.apply_async(f, (20,)) # runs in *only* one process
print(res.get(timeout=1)) # prints "400"
# evaluate "os.getpid()" asynchronously
res = pool.apply_async(os.getpid, ()) # runs in *only* one process
print(res.get(timeout=1)) # prints the PID of that process
# launching multiple evaluations asynchronously *may* use more processes
multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
print([res.get(timeout=1) for res in multiple_results])
# make a single worker sleep for 10 secs
res = pool.apply_async(time.sleep, (10,))
try:
print(res.get(timeout=1))
except TimeoutError:
print("We lacked patience and got a multiprocessing.TimeoutError")
print("For the moment, the pool remains available for more work")
# exiting the 'with'-block has stopped the pool
print("Now the pool is closed and no longer available")
- multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])
-
processes
是worker进程的数量,如果为None,则使用os.cpu_count()
的返回值 - 如果initializer不是None,那么worker进程在启动的时候会调用initializer(*initargs)
- maxtasksperchild代表一个worker进程在退出前能够完成的任务数,之后它会被替换为一个新的worker进程。
- 默认值为None,表示workder进程的生命周期和pool一样
- context是用来启动worker进程的上下文。
-
-
pool对象的方法只能被创建这个pool的进程调用。
- apply(func[, args[, kwds]]),调用方法func并阻塞,直到方法执行完毕。
- apply_async(func[, args[, kwds[, callback[, error_callback]]]]),调用func,当方法执行完毕后调用callback。
- map(func, iterable[, chunksize]),用于批量调用func,chunksize代表了iterable里chunk的大小。
- map_async(func, iterable[, chunksize[, callback[, error_callback]]]),同上,完成之后调用callback
- imap(func, iterable[, chunksize]),轻量版的map()
- imap_unordered(func, iterable[, chunksize]),和imap()一样,只是返回的结果顺序是不确定的。
- starmap(func, iterable[, chunksize]),和map()类似,只不过func返回的期望值为iterable的。
- starmap_async(func, iterable[, chunksize[, callback[, error_callback]]]), 同上,结果时调用callback。
- close(), 停止接受更多的任务。当前pool里的任务都完成后,退出。
- terminate(),立刻停止所有worker进程。
- join(),等待workder进程退出。
-
apply_async()和map_async()类返回的类为AsyncResult,它的方法包括:
- get([timeout]),在结果到达时返回它,否则抛出异常。
- wait([timeout]),等待结果到达。
- ready(),返回调用操作是否完成。
- successful(),同ready(),但不会抛出异常。
Process 类
from multiprocessing import Process
import os
def info(title):
print(title)
print('module name:', __name__)
print('parent process:', os.getppid())
print('process id:', os.getpid())
def f(name):
info('function f')
print('hello', name)
if __name__ == '__main__':
info('main line')
p = Process(target=f, args=('bob',))
p.start()
p.join()
方法
-
class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
- 对于进程来说,
group
总是为None -
target
是子进程要运行的对象,args
是它的参数,kwargs
是键:target(args) -kwargs
-
name
是进程名 -
daemon
决定子进程是不是守护进程,如果是None,就继承父进程的daemon值
- 对于进程来说,
run(),上述
target
就是run要调用的对象,你也可以在子类中覆写这个方法,一般不用。start(),启动子进程,让进程开始执行run(), 它需要在创建进程之后立刻调用。
-
join([timeout])
- 阻塞当前进程,并开始执行子进程
- 如果timeout是None,父进程会阻塞直到子进程结束;如果timeout是一个正整数,则子进程会在timeout秒之后阻塞。
- 父进程可以调用join很多次,但是进程不能自己join自己。
- FYI,这和我在上文中所提的LWT中join的实现不一样,LWT中的join()和这里的start是一样的,LWT中的yeild()和这里的join类似。
name, is_alivel, daemon, pid, exitcode, quthkey, sentinel,一些常量。
terminate(),终止当前进程,注意当前进程的子孙不会被终结。
异常:ProcessError, BufferTooShort, AuthenticationError, TimeoutError
关于start方法
multiprocessing
是封装之后的库,它的start
方法的底层实现有三种,分别是:spawn, fork和forkserver,你可以用set_start_method()
选择其中一种,但一个程序只能设置一次;也可以使用get_context()
来获取当前的使用的上下文,后者返回了一个和multiprocessing
模块具有相同APIs的对象。大多数情况下,直接调用start()
就好了。
一些功能方法
- active_children(),返回当前进程的子进程列表;
-
cpu_count(),返回CPU的数量,不是当前进程所使用的CPU数量。后者可以用
len(os.sched_getaffinity(0))
获取。 - current_process(),返回当前进程对象。
- freeze_support(),Add support for when a program which uses multiprocessing has been frozen to produce a Windows executable.
- get_all_start_methods(),返回所有支持的start方法,包括spawn, fork和forkserver。
- get_context(method=None),返回一个context对象,用起来和multiprocessing一样。
- get_start_method(allow_none=False),获取用于启动进程的start方法。
- set_executable(),为子进程设置Python解释器的路径。
- set_start_method(method),设置start方法。
进程通信的数据结构
进程通信的方法主要有两种:队列(Queues)和管道(Pipes),另外还有JoinableQueue和SimeleQueue。
Pipes
Pipe可以是单工或者双工的,却决于其参数Pipe[duplex=True]
from multiprocessing import Process, Pipe
def f(conn):
conn.send([42, None, 'hello'])
conn.close()
if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p = Process(target=f, args=(child_conn,))
p.start()
print(parent_conn.recv()) # prints "[42, None, 'hello']"
p.join()
管道的模式是一端进,一端出。但是它的文档指出如果两个进程同时从管道头读取或者从管道尾写入,则数据有可能被损坏(corrupted)。
Queue
from multiprocessing import Process, Queue
def f(q):
q.put([42, None, 'hello'])
if __name__ == '__main__':
q = Queue()
p = Process(target=f, args=(q,))
p.start()
print(q.get()) # prints "[42, None, 'hello']"
p.join()
-
multiprocessing.Queue
和queue.Queue
只是类似,不一样。 - 队列是进程安全和线程安全的,这说明Queue类包含了锁机制的实现。
- 根据文档,Queue是用Pipe和锁/信号量来实现的。
- 其他关于Queue的常用方法:qsize(), empty(), full(), put(obj[, block[, timeout]]), put_nowait(obj), get([block[, timeout]]), get_nowait()
- 还有一些不是很重要,大多数时候用不到的方法:close(), join_thread(), cancel_join_thread()
SimpeQueue
这是一个简化版的Queue,只实现了三个方法,类似于Pipe:empty(), get()和put(item)。
JoinableQueue
除了Queue中实现的方法,JoinableQueuee额外实现了task_done()和join()。
进程通信的Connection对象
Connection对象允许收发picklable对象或字符串,有点类似于socket程序。它一般使用Pipe创建。Picklable是指大约不超过32MB的数据。
- send(obj), recv(), 收发对象,太大的话会引发ValueError。
- fileno(),返回connection使用的文件描述符或句柄。
- close(),关闭connection对象,一般是一个pipe。
- poll([timeout]),返回是否有可以读取的数据
-
send_bytes(buffer[, offset[,size]]), recv_bytes([maxlength]), recv_bytes_into(buffer[, offset]),收发二进制的数据。
- send_bytes发送的数据不能超过32MB,否则抛出ValueError
- recv_bytes和recv_bytes_into: 如果没有东西接收会阻塞,如果对方关闭了连接则抛出EOFError;如果收到的数据超过了maxlength,抛出OSError。
- 例子
>>> from multiprocessing import Pipe
>>> a, b = Pipe()
>>> a.send([1, 'hello', None])
>>> b.recv()
[1, 'hello', None]
>>> b.send_bytes(b'thank you')
>>> a.recv_bytes()
b'thank you'
>>> import array
>>> arr1 = array.array('i', range(5))
>>> arr2 = array.array('i', [0] * 10)
>>> a.send_bytes(arr1)
>>> count = b.recv_bytes_into(arr2)
>>> assert count == len(arr1) * arr1.itemsize
>>> arr2
array('i', [0, 1, 2, 3, 4, 0, 0, 0, 0, 0])
侦听和客户端
上述通信模式的进一步封装,由 multiprocessing.connection 模块提供:
- multiprocessing.connection.deliver_challenge(connection, authkey),发送一条任意消息,然后进入等待。如果对方返回的是使用authkey加密的正确内容,则发送welcome消息。使用HMAC加密。
- multiprocessing.connection.answer_challenge(connection, authkey),接收一条消息,使用authkey进行加密,然后返回加密后的消息,等待welcome消息。
- multiprocessing.connection.Client(address[, family[, authkey]]),设置客户端,尝试向地址为address的侦听器建立连接。
- class multiprocessing.connection.Listener([address[, family[, backlog[, authkey]]]]),设置侦听器,侦听address表示的地址范围。
- multiprocessing.connection.accept(), 在侦听段上接收一个连接,并返回一个Connection。
- close(),关闭侦听器相关的套接字或pipe。
- address,侦听器使用的底地址。
- last_accepted,返回最后接收的连接的address,没有则为None。
- multiprocessing.connection.wait(object_list, timeout=None),等待object_list中某个object对象准备完成,并返回准备完成的对象。
例子:
Server
from multiprocessing.connection import Listener
from array import array
address = ('localhost', 6000) # family is deduced to be 'AF_INET'
with Listener(address, authkey=b'secret password') as listener:
with listener.accept() as conn:
print('connection accepted from', listener.last_accepted)
conn.send([2.25, None, 'junk', float])
conn.send_bytes(b'hello')
conn.send_bytes(array('i', [42, 1729]))
Client
from multiprocessing.connection import Client
from array import array
address = ('localhost', 6000)
with Client(address, authkey=b'secret password') as conn:
print(conn.recv()) # => [2.25, None, 'junk', float]
print(conn.recv_bytes()) # => 'hello'
arr = array('i', [0, 0, 0, 0, 0])
print(conn.recv_bytes_into(arr)) # => 8
print(arr) # => array('i', [42, 1729, 0, 0, 0])
wait的使用例子
import time, random
from multiprocessing import Process, Pipe, current_process
from multiprocessing.connection import wait
def foo(w):
for i in range(10):
w.send((i, current_process().name))
w.close()
if __name__ == '__main__':
readers = []
for i in range(4):
r, w = Pipe(duplex=False)
readers.append(r)
p = Process(target=foo, args=(w,))
p.start()
# We close the writable end of the pipe now to be sure that
# p is the only process which owns a handle for it. This
# ensures that when p closes its handle for the writable end,
# wait() will promptly report the readable end as being ready.
w.close()
while readers:
for r in wait(readers):
try:
msg = r.recv()
except EOFError:
readers.remove(r)
else:
print(msg)
进程同步
锁
from multiprocessing import Process, Lock
def f(l, i):
l.acquire()
try:
print('hello world', i)
finally:
l.release()
if __name__ == '__main__':
lock = Lock()
for num in range(10):
Process(target=f, args=(lock, num)).start()
锁是用的最多的,除了普通的锁,Python也支持递归锁,在后者中,同一个进程可以多次申请锁,但是必须要依次释放每一个锁。两种锁的用法是一样的,除了名字。递归锁是RLock()。
其他
其他的同步用的名词有:Barrier, BoundedSemaphore, Condition, Event, Semaphor。锁其实是一种特殊的信号量。
进程共享
共享ctypes对象
共享ctypes对象可以用来使用共享的内存,常见的对象有Value和Array,如:
from multiprocessing import Process, Value, Array
def f(n, a):
n.value = 3.1415927
for i in range(len(a)):
a[i] = -a[i]
if __name__ == '__main__':
num = Value('d', 0.0)
arr = Array('i', range(10))
p = Process(target=f, args=(num, arr))
p.start()
p.join()
print(num.value)
print(arr[:])
- multiprocessing.Value(typecode_or_type, *args, lock=True)
- typecode_or_type决定了Value对象的返回对象类型,它可以是ctype对象或者是Value中使用的数据类型。
- args是参数
- lock不仅可以是True或是False,代表是否需要锁;也可以直接传入一个Lock或Rlock对象用来作为这个Value对象的锁。如果选择False,这个Value就不再是进程安全的了。
- multiprocessing.Array(typecode_or_type, size_or_initializer, *, lock=True)
- 基本和Value差不多,size_or_initializer如果是一个数字,则代表了这个Array的大小,所有的元素会被初始化为0;它也可以是一个用于初始化的序列,序列的内容就是初始化的值,序列的大小就是Arrary的大小。
还有几个用来操作ctype类型对象的方法,它们属于sharedctypes模块:
- multiprocessing.sharedctypes.RawArray(typecode_or_type, size_or_initializer)
- multiprocessing.sharedctypes.RawValue(typecode_or_type, *args)
- multiprocessing.sharedctypes.Array(typecode_or_type, size_or_initializer, *, lock=True)
- multiprocessing.sharedctypes.Value(typecode_or_type, *args, lock=True)
- multiprocessing.sharedctypes.copy(obj)
- multiprocessing.sharedctypes.synchronized(obj[, lock])
管理者进程
使用Manager()
对象可以创建一条管理者进程,它可以用来存储Python对象,并允许其他本地或者远端的进程通过代理操作这些对象。支持的对象类型有: list, dict, Namespace, Lock, RLock, Semaphor, BoundedSemaphore, Condition, Event, Barrier, Queue, Value和Array 。
from multiprocessing import Process, Manager
def f(d, l):
d[1] = '1'
d['2'] = 2
d[0.25] = None
l.reverse()
if __name__ == '__main__':
with Manager() as manager:
d = manager.dict()
l = manager.list(range(10))
p = Process(target=f, args=(d, l))
p.start()
p.join()
print(d)
print(l)
Manager类定义在 multiprocessing.managers 模块里:
-
BaseManager([address[, authkey]])
-
address
是这个管理者进程用来进行侦听的地址,如果值是None,则会任意选择一个地址。authkey
是一个二进制字符串,如果值为None,则会使用current_process().authkey
里的值。 - 其他方法:
-
start([initializer[, initargs]]),启动一条子进程用于启动这个manager。如果initializer不为None,则子进程启动的时候和调用
initializer(*initargs)
。 - get_server(),返回一个server对象,它代表了Manager控制下的真实服务器。
- connect(),将一个本地manager连接到一个远程的manager进程。
- shutdown(),关闭本manager使用的进程。
- register(typeid[, callable[, proxytype[, exposed[, method_to_typeid[, create_method]]]]])。
- address, 只读变量,值为manager的地址。
-
-
SyncManager
自定义manager
from multiprocessing.managers import BaseManager
class MathsClass:
def add(self, x, y):
return x + y
def mul(self, x, y):
return x * y
class MyManager(BaseManager):
pass
MyManager.register('Maths', MathsClass)
if __name__ == '__main__':
with MyManager() as manager:
maths = manager.Maths()
print(maths.add(4, 3)) # prints 7
print(maths.mul(7, 8)) # prints 56
访问远端manager的例子
本地服务器
>>> from multiprocessing.managers import BaseManager
>>> from queue import Queue
>>> queue = Queue()
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue', callable=lambda:queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()
远程客户端
>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.get()
'hello'
本地客户端
>>> from multiprocessing import Process, Queue
>>> from multiprocessing.managers import BaseManager
>>> class Worker(Process):
... def __init__(self, q):
... self.q = q
... super(Worker, self).__init__()
... def run(self):
... self.q.put('local hello')
...
>>> queue = Queue()
>>> w = Worker(queue)
>>> w.start()
>>> class QueueManager(BaseManager): pass
...
>>> QueueManager.register('get_queue', callable=lambda: queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()
代理对象
A proxy is an object which refers to a shared object which lives (presumably) in a different process. The shared object is said to be the referent of the proxy. Multiple proxy objects may have the same referent.
这个属于那种很容易理解但是很难用文字表述的内容。举个例子:
# create a list proxy and append a mutable object (a dictionary)
lproxy = manager.list()
lproxy.append({})
# now mutate the dictionary
d = lproxy[0]
d['a'] = 1
d['b'] = 2
# at this point, the changes to d are not yet synced, but by
# updating the dictionary, the proxy is notified of the change
lproxy[0] = d
注意manager.list()
返回的是一个代理对象,因此manager.list([1,2,3]) == [1,2,3]
是False的。
- multiprocessing.managers. BaseProxy
- Proxy对象是BaseProxy子类的实例
- _callmethod(methodname[, args[, kwds]]),用来调用引用的方法
- _getvalue()返回引用的一份拷贝
-
__repr__
() 返回Proxy对象的描述 -
__str__
() 返回引用的描述
日志
logging 包可以支持日志记录,但是它没有使用共享锁,因此来自于不同进程的日志消息可能会混乱。
- multiprocessing.get_logger(),返回一个logger对象。如果是新建的logger,它的日志等级为
logging.NOTSET
。 - multiprocessing.log_to_stderr(),这个方法将get_logger()的日志重定向到
sys.stderr
上,格式为[%(levelname)s/%(processName)s] %(message)s
。
>>> import multiprocessing, logging
>>> logger = multiprocessing.log_to_stderr()
>>> logger.setLevel(logging.INFO)
>>> logger.warning('doomed')
[WARNING/MainProcess] doomed
>>> m = multiprocessing.Manager()
[INFO/SyncManager-...] child process calling self.run()
[INFO/SyncManager-...] created temp directory /.../pymp-...
[INFO/SyncManager-...] manager serving at '/.../listener-...'
>>> del m
[INFO/MainProcess] sending shutdown message to manager
[INFO/SyncManager-...] manager exiting with exitcode 0