进程和线程的概念
从系统调度和资源分配的角度来看,进程是 CPU 资源分配的最小单位,线程是 CPU 调度的最小单位。从 CPU 执行时间的角度来看,进程是包含了上下文切换的程序执行时间总和,线程是共享了进程的上下文环境的更为细小的 CPU 时间段。
多进程
Python 使用 multiprocessing 包来实现多进程。使用 multiprocessing 包我们只需要定义一个函数(Python 会完成其他所有事情)即可完成从单进程到多进程的转换。
使用 Process 类创建子进程
Python 使用 multiprocessing 模块提供的 Process 类来代表一个进程对象。
建立一个进程对象使用 Process(group=None, target=None, name=None, args=(), kwargs={}),其中group总是设置为None。target表示调用对象。name为别名。args表示调用对象的位置参数元组。kwargs表示调用对象的字典。
Process 类含有以下方法:
- start 启动进程
- join 等待进程执行完毕
- is_alive 进程是否处于活动状态
- terminate 终止进程,用在进程为死循环的情况下,手动终止进程。
from multiprocessing import Process
import os
def info(title):
print title
print 'module name:', __name__
if hasattr(os, 'getppid'):
print 'parent process:', os.getppid()
print 'process id:', os.getpid()
def f(name):
info('function f')
print 'hello', name
if __name__ == '__main__':
info('main line')
# 建立一个进程对象
p = Process(target=f, args=('bob',))
# 启动新建的进程
p.start()
print p.is_alive()
# 等待进程执行完毕
p.join()
print p.is_alive()
运行结果如下:
main line
module name: __main__
parent process: 657
process id: 767
True
function f
module name: __main__
parent process: 767
process id: 768
hello bob
False
使用 Pool 类启动多个子进程
若果需要批量启动多个子进程,可以使用进程池的方式批量创建子进程
from multiprocessing import Pool, TimeoutError
import time
import os
def f(x):
return x * x
if __name__ == '__main__':
pool = Pool(processes=4)
print pool.map(f, range(10))
for i in pool.imap_unordered(f, range(10)):
print i
res = pool.apply_async(f, (20,))
print res.get(timeout = 1)
res = pool.apply_async(os.getpid, ())
print res.get(timeout = 1)
multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
print [res.get(timeout=1) for res in multiple_results]
res = pool.apply_async(time.sleep, (10,))
try:
print res.get(timeout = 1)
except TimeoutError:
print 'We lacked patience and got a multiporcessing.TimoutError'
运行结果如下:
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
0
1
4
9
16
25
36
64
49
81
400
944
[946, 945, 947, 944]
We lacked patience and got a multiporcessing.TimoutError
进程间通信
multiprocessing 支持 Queues 和 Pipes 两种类型进行进程间通信。
Pipe 既管道模式,调用 pipe 将返回管道的两端的两个链接。pipe 仅仅适用于只有两个进程且一读一写的单双工模式,既信息只能从一个方向向另一个方向流动。
pipe 适用于读写小于要求高的一读一写的单双工模式。
from multiprocessing import Process, Pipe
def f(conn):
# 向主进程发送信息
conn.send([42, None, 'hello'])
conn.close()
if __name__ == '__main__':
# 建立 Pipe 并获取连接
parent_conn, child_conn = Pipe()
# 建立子进程
p = Process(target=f, args=(child_conn,))
p.start()
# 获取子进程发送的信息
print parent_conn.recv()
p.join()
运行结果如下:
[42, None, 'hello']
Queue 在 python 中是基于 Pipe 实现的,Queue 也是一边发送一边接收,Queue 与 Pipe 最大的区别是 Queue 允许同时又多个进程发送和接收数据。
from multiprocessing import Process, Queue
import os, time, random
# 写数据进程执行的代码:
def write(q):
print('Process to write: %s' % os.getpid())
for value in ['A', 'B', 'C']:
print('Put %s to queue...' % value)
q.put(value)
time.sleep(random.random())
# 读数据进程执行的代码:
def read(q):
print('Process to read: %s' % os.getpid())
while True:
value = q.get(True)
print('Get %s from queue.' % value)
if __name__=='__main__':
# 父进程创建Queue,并传给各个子进程:
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
# 启动子进程pw,写入:
pw.start()
# 启动子进程pr,读取:
pr.start()
# 等待pw结束:
pw.join()
# pr进程里是死循环,无法等待其结束,只能强行终止:
pr.terminate()
运行结果如下:
Process to write: 1046
Put A to queue...
Process to read: 1047
Get A from queue.
Put B to queue...
Get B from queue.
Put C to queue...
Get C from queue.
Queue 是一个线程和进程安全的队列。
Queues are thread and process safe.
多线程
相比于进程,线程更加的轻量,可以实现并发。Python 中的多线程实时上并非真正的多线程,这要从全局解释器锁(GIL)说起,Python 使用的解释器 Cpython 的线程是操作系统的原生线程,在解释器内执行的 Python 代码,都需要获取全局解释器锁才能执行,只有在遇到 I/O 操作时会释放全局解释器锁,由于 Python 的进程做为一个整体,因此解释器进程内只有一个线程在执行,其它的线程都处于等待状态等着全局解释器锁的释放。
若要尽量使用 CPU 资源使用多进程是不错的选择,对于 I/O 密集型操作使用多线程是不错的选择,Python 不适用于计算密集型应用。
在 Python 中使用 Threading 模块(Threading 模块是 Thread 的增强版本,Threading 从 Python 1.5.2 版本开始加入)来实现多线程操作。
使用 Threading 创建线程
看以下简单示例:
import threading
def threadHandler(number):
print 'thread %s is running...' % threading.current_thread().name
print number * 2
print 'thread %s ended.' % threading.current_thread().name
if __name__ == '__main__':
print 'thread %s is runnging...' % threading.current_thread().name
for i in range(5):
my_thread = threading.Thread(target=threadHandler, args=(i,))
my_thread.start()
print 'thread %s ended.' % threading.current_thread().name
运行结果如下:
thread MainThread is runnging...
thread Thread-1 is running...
0
thread Thread-1 ended.
thread MainThread ended.
thread Thread-2 is running...
2
thread Thread-2 ended.
thread MainThread ended.
thread Thread-3 is running...
4
thread Thread-3 ended.
thread MainThread ended.
thread Thread-4 is running...
6
thread Thread-4 ended.
thread MainThread ended.
thread Thread-5 is running...
8
thread MainThread ended.
thread Thread-5 ended.
由于任何进程默认都会启动一个线程,我们把该线程称为主线程,主线程又可以启动新的线程,Python 的 threading 模块的 current_thread() 函数,返回当前线程的实例。
线程锁与线程同步
由于线程共享了进程的上下文环境,所以在多线程中,所有变量都由所有线程共享,任何一个变量都可以被任何一个线程修改。因此,线程之间共享数据最大的风险在于多个线程同个更改一个变量,将内容给改乱了。
我们来看一个多线程操作变量的示例:
import threading, time
total = 0
def update():
global total
for i in range(10000):
total = total + i
total = total - i
if __name__ == '__main__':
for i in range(10):
thread = threading.Thread(target=update, args=())
thread.start()
print total
以上示例的设计预期最终 total 的值应该为 0,但是在实际运行中会发现 total 的实际运行结果每次均不相同。造成这种情况的原因是由于 total 在多个线程中被修改造成存储内容的混乱。
现在我们在该示例代码中加入线程锁,有两种方法可以实现 try/finally 和 with。
以下示例使用 try/finally 来实现增加线程锁。
import threading, time
total = 0
lock = threading.Lock()
def update():
global total
for i in range(10000):
lock.acquire()
try:
total = total + i
total = total - i
finally:
lock.release()
if __name__ == '__main__':
for i in range(10):
my_thread = threading.Thread(target=update, args=())
my_thread.start()
print total
以下示例使用 with来实现增加线程锁。
import threading, time
total = 0
lock = threading.Lock()
def update():
global total
for i in range(10000):
with lock:
total = total + i
total = total - i
if __name__ == '__main__':
for i in range(10):
my_thread = threading.Thread(target=update, args=())
my_thread.start()
print total
在使用 try/finally 和 with 增加锁后,以上代码的运行结果均符合预期。
线程间通信
Queue 模块用来实现消息队列功能,可以实现线程间安全的消息交换。各个线程可以通过调用消息队列实例对消息队列进行操纵。
from Queue import Queue
import threading
import os, time, random
# 写数据线程执行的代码:
def write(q):
print 'thread to write: %s' % threading.current_thread().name
for value in ['A', 'B', 'C']:
print 'Put %s to queue...' % value
q.put(value)
# 读数据线程执行的代码:
def read(q):
print 'Process to read: %s' % threading.current_thread().name
while q.empty() == False:
value = q.get(True)
print 'Get %s from queue.' % value
if __name__=='__main__':
# 父线程创建Queue,并传给各个子进程:
q = Queue()
pw = threading.Thread(target=write, args=(q,))
pr = threading.Thread(target=read, args=(q,))
# 启动子线程pw,写入:
pw.start()
# 等待pw结束:
pw.join()
# 启动子线程pr,读取:
pr.start()
运行结果如下:
thread to write: Thread-1
Put A to queue...
Put B to queue...
Put C to queue...
Process to read: Thread-2
Get A from queue.
Get B from queue.
Get C from queue.