上一篇:锁
生产者消费者问题是多线程中一个很经典并发协作的问题,这个问题主要包含两类线程,一个是生产者用于生产数据,另一个是消费者用于消费数据,两者操作同一个数据共享区域,这种模型在编程中非常常见,比如爬虫,生产者负责爬取链接,消费者负责解析链接所指向的网页内容。这种模型需要满足下面的两个特征:
- 消费者在数据共享区域为空时阻塞,直到共享区域出现新数据。
- 生产者在数据共享区域满时阻塞,直到数据共享区出现空位。
下面是一个简单的例子:
import threading
import time
import random
MAX_BUFF_LEN = 5
buff = []
lock = threading.Lock()
class Producer(threading.Thread):
def run(self):
global buff
while True:
lock.acquire()
if len(buff) < MAX_BUFF_LEN:
# 如果共享区域未满,生产数据
num = random.uniform(0, 5)
buff.append(num)
print('生产者向共享区域加入%f' % num)
lock.release()
time.sleep(random.uniform(0, 10))
class Consumer(threading.Thread):
def run(self):
global buff
while True:
lock.acquire()
if buff:
# 如果共享区非空,消费数据
num = buff.pop(0)
print('消费者消费掉%f' %num)
lock.release()
time.sleep(random.uniform(0, 10))
producer = Producer()
consumer = Consumer()
producer.setDaemon(True)
consumer.setDaemon(True)
try:
producer.start()
consumer.start()
producer.join()
consumer.join()
except KeyboardInterrupt:
print('程序强制结束!')
程序运行结果如下:
生产者向共享区域加入1.653411
消费者消费掉1.653411
生产者向共享区域加入2.176285
生产者向共享区域加入4.727504
生产者向共享区域加入3.053323
消费者消费掉2.176285
生产者向共享区域加入0.951072
消费者消费掉4.727504
^C程序强制结束!
在程序中设置两个进程为守护进程,并捕捉KeyboardInterrupt
错误,一旦捕捉到就结束主线程,同时结束两个子线程。上面是一个生产者消费者模型的一个简单实现,通过共享变量的方式使两个线程互相通信来达成一致。共享变量是线程间通信的常用方法,只要记得在对共享变量进行操作时加锁,程序就不会有问题。
但是上面的代码也有问题,在于这种代码通过无限对共享变量访问的方式进行判断空还是满,这样也降低了效率。因为其中一个程序在明明知道buff
满了或者空了的情况下还要进行无意义的循环,由于GIL机制,它会和其他线程争夺执行权。如果某一方在判断buff
满了或者空了的情况下主动阻塞,直到另外一方通知它,它才恢复,这样就能最大化的效率。
Python中threading
中的Condition
类就是来帮助我们完成这件事的。它的wait
和notify
方法能够阻塞和通知一个线程,下面还是通过例子来了解一下:
import threading
import time
import random
MAX_BUFF_LEN = 5
buff = []
condition = threading.Condition()
class Producer(threading.Thread):
def run(self):
global buff
while True:
condition.acquire()
if len(buff) < MAX_BUFF_LEN:
# 如果共享区域未满,生产数据
num = random.uniform(0, 5)
buff.append(num)
print('生产者向共享区域加入%f' % num)
condition.notify()
else:
# 如果共享区满,停止生产
print('共享区满,生产者阻塞!')
condition.wait()
condition.release()
time.sleep(random.uniform(0, 10))
class Consumer(threading.Thread):
def run(self):
global buff
while True:
condition.acquire()
if buff:
# 如果共享区非空,消费数据
num = buff.pop(0)
print('消费者消费掉%f' %num)
condition.notify()
else:
# 如果共享去空,停止消费
print('共享区空,消费者阻塞!')
condition.wait()
condition.release()
time.sleep(random.uniform(0, 10))
producer = Producer()
consumer = Consumer()
producer.setDaemon(True)
consumer.setDaemon(True)
try:
producer.start()
consumer.start()
producer.join()
consumer.join()
except KeyboardInterrupt:
print('程序强制结束!')
程序结果:
生产者向共享区域加入0.040350
消费者消费掉0.040350
共享区空,消费者阻塞!
生产者向共享区域加入3.266167
消费者消费掉3.266167
生产者向共享区域加入3.468917
^C程序强制结束!
上面的代码中,acquire
方法实际上是获得锁,wait
方法将线程阻塞,实际上是将锁释放。当一个线程调用notify
方法时,另一个线程就被唤醒,但是这时候这个线程并没有调用wait
或者release
方法释放锁,因此另一个线程虽然醒过来了但是还是没有执行,直到这个线程将锁释放。
在使用共享变量的时候,需要时刻注意是否线程安全,非常不方便。好在是Python中提供了一个Queue
类,它是线程安全的,有了它我们可以把注意力放在如何实现代码逻辑上,而不是过多的注意到线程安全上。在Python2.7中该模块名为Queue
,而在Python3.6中该模块名为queue。
使用Queue
类改进的代码如下:
import threading
import time
import random
from queue import Queue
MAX_BUFF_LEN = 5
buff = Queue(MAX_BUFF_LEN)
condition = threading.Condition()
class Producer(threading.Thread):
def run(self):
global buff
while True:
num = random.uniform(0, 5)
buff.put(num)
print('生产者向共享区域加入%f' % num)
time.sleep(random.uniform(0, 10))
class Consumer(threading.Thread):
def run(self):
global buff
while True:
num = buff.get()
print('消费者消费掉%f' %num)
time.sleep(random.uniform(0, 10))
producer = Producer()
consumer = Consumer()
producer.setDaemon(True)
consumer.setDaemon(True)
try:
producer.start()
consumer.start()
producer.join()
consumer.join()
except KeyboardInterrupt:
print('程序强制结束!')
Queue
是一个FIFO队列,它的get
方法和put
方法分别是入队和出队,在入队和出队时获取了锁以保证线程安全,如果队列空或者满,默认情况下get
方法和put
方法自动阻塞。阻塞和唤醒的方式实质上是调用了Condition
类的wait
和notify
方法。Queue
类比较简单,推荐大家直接查看源码或者官方文档。
这里还有一篇写得非常好的博客,推荐大家去看看:Producer-consumer problem in Python