一、多进程
说到 Python 多进程必然会有
multiprocessing
模块。但是在说multiprocessing
模块前,先看一下子进程是如何创建的。
1. 创建子进程 fork
再 Linux 或 Unix 系统中 提供了一个叫做
fork
的函数。
系统调用fork
用于从已存在进程中创建一个新进程,新进程称为子进程,而原进程称为父进程。
-
fork
调用一次,返回两次,两次返回值分别是:
(1)在父进程中的返回值是子进程的进程号
(2)在子进程中的返回值则返回 0
fork 返回值:
< 0 子进程创建失败
= 0 在子进程中的返回值
> 0 在父进程中的返回值
-
fork
创建的子进程特性如下
(1)子进程会继承父进程几乎全部代码段(包括 fork
前所定义的所有内容)
(2)子进程拥有自己独立的信息标识,如PID
(3)父、子进程独立存在,在各自存储空间上运行,互不影响
(4)创建父子进程执行不同的内容是多任务中固定方法
-
示例
再 Python 的 os
模块中提供了 fork
函数。下面来看一个创建子进程的例子:
#!coding=utf-8
import os
from time import sleep
pid = os.fork()
if pid < 0:
print("create process failed")
elif pid == 0:
print "子进程PID %s" % os.getpid() # 子进程PID
print "父进程PID %s" % os.getppid() # 父进程PID
print "子进程执行过程"
else:
sleep(1)
print "=" * 30
print "子进程PID %s" % pid # 父进程中fork()返回值 = 子进程PID
print "父进程PID %s" % os.getpid() # 父进程PID
print "父进程执行过程"
结果如下:
子进程PID 26569
父进程PID 26528
子进程执行过程
==============================
子进程PID 26569
父进程PID 26528
父进程执行过程
[Finished in 1.4s]
代码中 sleep(1)
是为了等待子进程结束后,再结束父进程。
下面示例中,先结束父进程再结束子进程,看看 pid 的变化
#!coding=utf-8
import os
from time import sleep
pid = os.fork()
if pid < 0:
print("创建失败")
elif pid == 0:
sleep(5)
print "子进程PID %s" % os.getpid() # 子进程PID
print "父进程PID %s" % os.getppid() # 父进程PID
print "子进程执行过程"
else:
sleep(1)
print "子进程PID %s" % pid # 父进程中fork()返回值 = 子进程PID
print "父进程PID %s" % os.getpid() # 父进程PID
print "父进程执行过程"
print "=" * 30
执行结果如下:
子进程PID 26885
父进程PID 26844
父进程执行过程
==============================
子进程PID 26885
父进程PID 1
子进程执行过程
[Finished in 5.4s]
会发现子进程的父进程 pid 发生了改变。因为子进程原来的父进程已经结束,子进程变成了孤儿,所以由系统的 init 进程收留。
下面是使用 ps
查看 ppid 的变化
2. Python 多进程 multiprocessing
模块
-
multiprocessing.Process
创建一个进程
示例
from multiprocessing import Process
def hello(name):
print 'hello', name
if __name__ == '__main__':
p = Process(target=hello, args=('Fat',))
p.start()
p.join()
方法 | 说明 | 参数 |
---|---|---|
Process(target=function, args=(param, ), name='name') |
初始化进程 | 参数:target : 要执行的函数 args : 参数元祖的形式 name : 进程名字 |
start() |
启动进程 | 没有参数 |
join(timeout=None) |
主进程阻塞等待子进程的退出 | 参数:time_out : 超时时间 |
-
multiprocessing.Pool
进程池批量创建子进程
一般分为两种堵塞和非堵塞
非堵塞进程池
示例
#!coding=utf-8
from multiprocessing import Pool
import os
import time
import random
def task(name):
print('运行任务 %s (PID %s)...' % (name, os.getpid()))
start = time.time()
time.sleep(random.random() * 5)
end = time.time()
print('任务 %s 运行时间 %0.2f' % (name, (end - start)))
if __name__ == '__main__':
print('父进程PID %s.' % os.getpid())
p = Pool(4)
for i in range(5):
p.apply_async(task, args=(i,))
print('等待所有子进程结束')
p.close()
p.join()
print('结束')
结果如下
父进程PID 27851.
等待所有子进程结束
运行任务 0 (PID 27893)...
运行任务 1 (PID 27894)...
运行任务 2 (PID 27895)...
运行任务 3 (PID 27896)...
任务 2 运行时间 1.82
运行任务 4 (PID 27895)...
任务 1 运行时间 2.33
任务 4 运行时间 1.04
任务 3 运行时间 3.24
任务 0 运行时间 3.86
结束
[Finished in 4.4s]
程序中进程池数量上线为4,执行次数为5,所以在结果中看到“运行任务 4 (PID 27895)...”出现在“任务 2 运行时间 1.82”和“任务 1 运行时间 2.33”中间。
因为要等待进程池中有空闲了,才会继续执行。
方法 | 说明 | 参数 |
---|---|---|
Pool(processes=None, initializer=None, initargs=(), maxtasksperchild=None) |
创建进程池 | 参数:processes : 创建子进程数量initializer :初始化pool中的worker的时候调用的初始化函数,例如你每一个worker需要连接数据库maxtasksperchild : 每个子进程最大的任务量,每干完几次任务后会销毁重建 |
`close | 防止将更多任务提交到池中。一旦完成所有任务,工作进程将退出。 | |
apply_async(func, args=(), kwds={}, callback=None) |
任务加入到子进程,非堵塞模式 | 参数: func : 需要执行的函数 'args': 出入要执行函数的参数 |
堵塞进程池
示例:
#!coding=utf-8
from multiprocessing import Pool
import os
import time
import random
def task(name):
print('运行任务 %s (PID %s)...' % (name, os.getpid()))
start = time.time()
time.sleep(random.random() * 5)
end = time.time()
print('任务 %s 运行时间 %0.2f' % (name, (end - start)))
if __name__ == '__main__':
print('父进程PID %s.' % os.getpid())
p = Pool(4)
for i in range(5):
p.apply(task, args=(i,))
print('等待所有子进程结束')
p.close()
p.join()
print('结束')
运行结果如下:
父进程PID 28050.
运行任务 0 (PID 28091)...
任务 0 运行时间 2.25
运行任务 1 (PID 28092)...
任务 1 运行时间 3.76
运行任务 2 (PID 28093)...
任务 2 运行时间 0.45
运行任务 3 (PID 28094)...
任务 3 运行时间 1.36
运行任务 4 (PID 28091)...
任务 4 运行时间 1.00
等待所有子进程结束
结束
[Finished in 9.3s]
可以看到一个一个的运行。
堵塞使用 apply()
方法
3. 进程间通讯 Queue
、Pipes
Queue
示例:
#!coding=utf-8
from multiprocessing import Process, Queue
import time
import os
def put(q):
print "PUT Queue 值的进程 PID %s" % os.getpid()
for i in range(1, 6):
print "PUT 值为:%s" % i
q.put(i)
time.sleep(i)
def get(q):
print "获取 Queue 值的进程 PID %s" % os.getpid()
while True:
value = q.get(True)
print('获取的值为:%s' % value)
if __name__ == '__main__':
q = Queue()
p_put = Process(target=put, args=(q,))
p_get = Process(target=get, args=(q,))
# 启动 PUT 进程
p_put.start()
# 启动 GET 进程
p_get.start()
# 等待 PUT 结束:
p_put.join()
# 强行终止 get 这个死循环进程
p_get.terminate()
运行结果
PUT Queue 值的进程 PID 59241
PUT 值为:1
获取 Queue 值的进程 PID 59242
获取的值为:1
PUT 值为:2
获取的值为:2
PUT 值为:3
获取的值为:3
PUT 值为:4
获取的值为:4
[Finished in 11.1s]
这里需要注意的是,multiprocessing.Queue
不支持 multiprocessing.Pool
。如果要在进程池中使用 Queue
使用 multiprocessing.Manager.Queue
Queue
其他方法
方法 | 说明 |
---|---|
q.qsize() | 返回当前队列的空间 |
q.empty() | 判断当前队列是否为空 |
q.full() | 判断当前队列是否满 |
q.put() | 放消息 |
q.get() | 获取消息 |
q.task_done() | 接受消息的线程调用该函数来说明消息对应的任务是否已经完成 |
q.join() | 等待队列为空,在执行别的操作 |
Pipe
示例:
Pipe
管道是一种半双工的通信方式,数据只能单向流动,而且只能在具有亲缘关系的进程间使用。进程的亲缘关系通常是指父子进程关系。Pipe()
方法返回一个元组(conn1, conn2)
参数
duplex
:
- 模式是 True,双工模式;conn1 和 conn2 是表示管道两端的 Connection 对象。
- False 单工模式,conn1只能接受,conn2只能用于发送。
#!coding=utf-8
from multiprocessing import Process, Pipe
import time
import os
def write(p):
print "PUT Queue 值的进程 PID %s" % os.getpid()
for i in range(1, 6):
print "PUT 值为:%s" % i
p.send(i)
time.sleep(i)
def read(p):
print "获取 Queue 值的进程 PID %s" % os.getpid()
while True:
value = p.recv()
print('获取的值为:%s' % value)
if __name__ == '__main__':
p_read, p_write = Pipe(False)
pw = Process(target=write, args=(p_write,))
pr = Process(target=read, args=(p_read,))
pw.start()
pr.start()
pw.join()
pr.terminate()
运行结果
PUT Queue 值的进程 PID 58896
PUT 值为:1
获取 Queue 值的进程 PID 58897
获取的值为:1
PUT 值为:2
获取的值为:2
PUT 值为:3
获取的值为:3
PUT 值为:4
获取的值为:4
PUT 值为:5
获取的值为:5
[Finished in 16.1s]
二、多线程
Python 再创建线程时是启动了一个真正的线程 Posix Thread。但是因为有一个GIL锁(Global Interpreter Lock)。所有线程再执行时都必须获取一个 GIL 锁。
因为 GIL 锁的存在,每个线程再执行一个定的数据量后,解释器会自动释放 GIL 锁,让下一个线程执行。
再 Python 多线程中,GIL 锁实际上把所有线程的执行代码都给上了锁, 每个线程是交替执行的。
所以,多线程在 Python 中只能交替执行,就算有 100 个线程跑在 100 核的 CPU 上,也只能用到1个核。
1. threading.Thread
创建线程
常用函数说明:
函数 | 说明 | 参数 |
---|---|---|
ThreadObject=threading.Thread(target, args=None, name=None) |
创建一个线程 | 参数: target : 需要线程执行的函数args : 给线程执行的函数传递参数 name : 线程名 |
threading.active_count() |
获取当前线程数量 | |
threading.current_thread() |
获取当前线程信息 | 返回当前 Thread 对象 |
threading.enumerate() |
返回正在运行的线程是一个 list | |
ThreadObject.start() |
启动线程 | |
ThreadObject.isAlive() |
查看线程是否活跃 | |
ThreadObject.getName() |
获取线程名 | |
ThreadObject.setName() |
设置线程名 | |
ThreadObject.join(time) |
等待线程终止,这阻塞调用线程直至线程的 join() 方法被调用中止-正常退出或者抛出未处理的异常-或者是可选的超时发生。 |
示例:
#!coding=utf-8
import threading
import random
import time
def job(name):
start = time.time()
time.sleep(random.random() * 5)
print "当前线程的个数: %s" % threading.active_count()
print "线程的名: %s" % threading.current_thread().name
end = time.time()
print "执行时间: %s" % (end - start)
print name
if __name__ == "__main__":
print threading.active_count()
print threading.current_thread().name
# 创建线程 并开始执行线程
t1 = threading.Thread(target=job, name="Job1", args=('Thread1',))
t2 = threading.Thread(target=job, name="Job2", args=('Thread2',))
# # 使用start方法开始进程
t1.start()
t2.start()
结果为:
1
MainThread
当前线程的个数: 3
线程的名: Job2
执行时间: 1.19193983078
Thread2
当前线程的个数: 2
线程的名: Job1
执行时间: 3.69679808617
Thread1
[Finished in 4.1s]
2. multiprocessing.dummy
线程池
用法和
Pool
相同。from multiprocessing.dummy import Pool
这样加载池就可以了。
3. 数据同步
多线程中,所有数据都是共享的,如实多个线程同时修改某个数据,就恨尴尬了。所以在这里需要线程同步。
在
threading
中提供了Lock
和Rlock
可以实现简单的线程同步。
-
Lock
在 Lock
中有 acquire
和 release
方法。且这两个方法要同时出现。可以将其操作放到acquire和release方法之间。
from threading import Lock
lock = Lock()
lock.acquire()
lock.release()
-
Rlock
RLock
对象的 acquire()
/ release()
是可以嵌套的,当最后一个或者最外层 release()
执行结束后,锁才被设置为 unlocked。
-
方法说明
方法 | 说明 |
---|---|
acquire() |
获得锁。该方法等待锁被解锁,将其设置为 locked 并返回 True。 |
release() |
释放锁。当锁被锁定时,将其重置为解锁并返回。如果锁未锁定,则会引发 RuntimeError。 |
locked() |
如果锁被锁定,返回True。 |
示例:
#!coding=utf-8
import threading
import time
class MyThread(threading.Thread):
def __init__(self, num):
threading.Thread.__init__(self)
self.num = num
def run(self):
global x
# 获得锁
lock.acquire()
x += self.num
time.sleep(1)
print(x)
# 释放锁
lock.release()
# 创建锁
lock = threading.RLock()
# lock = threading.Lock()
x = 0
thread_list = []
for i in range(1, 5):
# 创建线程
t = MyThread(i)
thread_list.append(t)
for i in thread_list:
i.start()