Python多线程(四):生产者消费者问题

上一篇:

生产者消费者问题是多线程中一个很经典并发协作的问题,这个问题主要包含两类线程,一个是生产者用于生产数据,另一个是消费者用于消费数据,两者操作同一个数据共享区域,这种模型在编程中非常常见,比如爬虫,生产者负责爬取链接,消费者负责解析链接所指向的网页内容。这种模型需要满足下面的两个特征:

  • 消费者在数据共享区域为空时阻塞,直到共享区域出现新数据。
  • 生产者在数据共享区域满时阻塞,直到数据共享区出现空位。

下面是一个简单的例子:

import threading
import time
import random
MAX_BUFF_LEN = 5

buff = []
lock = threading.Lock()

class Producer(threading.Thread):
    def run(self):
        global buff
        while True:
            lock.acquire()
            if len(buff) < MAX_BUFF_LEN:
                # 如果共享区域未满,生产数据
                num = random.uniform(0, 5)
                buff.append(num)
                print('生产者向共享区域加入%f' % num)
                lock.release()
            time.sleep(random.uniform(0, 10))

class Consumer(threading.Thread):
    def run(self):
        global buff
        while True:
            lock.acquire()
            if buff:
                # 如果共享区非空,消费数据
                num = buff.pop(0)
                print('消费者消费掉%f' %num)
                lock.release()
            time.sleep(random.uniform(0, 10))

producer = Producer()
consumer = Consumer()
producer.setDaemon(True)
consumer.setDaemon(True)
try:
    producer.start()
    consumer.start()
    producer.join()
    consumer.join()
except KeyboardInterrupt:
    print('程序强制结束!')

程序运行结果如下:

生产者向共享区域加入1.653411
消费者消费掉1.653411
生产者向共享区域加入2.176285
生产者向共享区域加入4.727504
生产者向共享区域加入3.053323
消费者消费掉2.176285
生产者向共享区域加入0.951072
消费者消费掉4.727504
^C程序强制结束!

在程序中设置两个进程为守护进程,并捕捉KeyboardInterrupt错误,一旦捕捉到就结束主线程,同时结束两个子线程。上面是一个生产者消费者模型的一个简单实现,通过共享变量的方式使两个线程互相通信来达成一致。共享变量是线程间通信的常用方法,只要记得在对共享变量进行操作时加锁,程序就不会有问题。

但是上面的代码也有问题,在于这种代码通过无限对共享变量访问的方式进行判断空还是满,这样也降低了效率。因为其中一个程序在明明知道buff满了或者空了的情况下还要进行无意义的循环,由于GIL机制,它会和其他线程争夺执行权。如果某一方在判断buff满了或者空了的情况下主动阻塞,直到另外一方通知它,它才恢复,这样就能最大化的效率。

Python中threading中的Condition类就是来帮助我们完成这件事的。它的waitnotify方法能够阻塞和通知一个线程,下面还是通过例子来了解一下:

import threading
import time
import random
MAX_BUFF_LEN = 5

buff = []
condition = threading.Condition()

class Producer(threading.Thread):
    def run(self):
        global buff
        while True:
            condition.acquire()
            if len(buff) < MAX_BUFF_LEN:
                # 如果共享区域未满,生产数据
                num = random.uniform(0, 5)
                buff.append(num)
                print('生产者向共享区域加入%f' % num)
                condition.notify()
            else:
                # 如果共享区满,停止生产
                print('共享区满,生产者阻塞!')
                condition.wait()
            condition.release()
            time.sleep(random.uniform(0, 10))

class Consumer(threading.Thread):
    def run(self):
        global buff
        while True:
            condition.acquire()
            if buff:
                # 如果共享区非空,消费数据
                num = buff.pop(0)
                print('消费者消费掉%f' %num)
                condition.notify()
            else:
                # 如果共享去空,停止消费
                print('共享区空,消费者阻塞!')
                condition.wait()
            condition.release()
            time.sleep(random.uniform(0, 10))

producer = Producer()
consumer = Consumer()
producer.setDaemon(True)
consumer.setDaemon(True)
try:
    producer.start()
    consumer.start()
    producer.join()
    consumer.join()
except KeyboardInterrupt:
    print('程序强制结束!')

程序结果:

生产者向共享区域加入0.040350
消费者消费掉0.040350
共享区空,消费者阻塞!
生产者向共享区域加入3.266167
消费者消费掉3.266167
生产者向共享区域加入3.468917
^C程序强制结束!

上面的代码中,acquire方法实际上是获得锁,wait方法将线程阻塞,实际上是将锁释放。当一个线程调用notify方法时,另一个线程就被唤醒,但是这时候这个线程并没有调用wait或者release方法释放锁,因此另一个线程虽然醒过来了但是还是没有执行,直到这个线程将锁释放。

在使用共享变量的时候,需要时刻注意是否线程安全,非常不方便。好在是Python中提供了一个Queue类,它是线程安全的,有了它我们可以把注意力放在如何实现代码逻辑上,而不是过多的注意到线程安全上。在Python2.7中该模块名为Queue,而在Python3.6中该模块名为queue。使用Queue类改进的代码如下:

import threading
import time
import random
from queue import Queue

MAX_BUFF_LEN = 5

buff = Queue(MAX_BUFF_LEN)
condition = threading.Condition()

class Producer(threading.Thread):
    def run(self):
        global buff
        while True:
            num = random.uniform(0, 5)
            buff.put(num)
            print('生产者向共享区域加入%f' % num)
            time.sleep(random.uniform(0, 10))

class Consumer(threading.Thread):
    def run(self):
        global buff
        while True:
            num = buff.get()
            print('消费者消费掉%f' %num)
            time.sleep(random.uniform(0, 10))

producer = Producer()
consumer = Consumer()
producer.setDaemon(True)
consumer.setDaemon(True)
try:
    producer.start()
    consumer.start()
    producer.join()
    consumer.join()
except KeyboardInterrupt:
    print('程序强制结束!')

Queue是一个FIFO队列,它的get方法和put方法分别是入队和出队,在入队和出队时获取了锁以保证线程安全,如果队列空或者满,默认情况下get方法和put方法自动阻塞。阻塞和唤醒的方式实质上是调用了Condition类的waitnotify方法。Queue类比较简单,推荐大家直接查看源码或者官方文档。

这里还有一篇写得非常好的博客,推荐大家去看看:Producer-consumer problem in Python

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • 进程和线程 进程 所有运行中的任务通常对应一个进程,当一个程序进入内存运行时,即变成一个进程.进程是处于运行过程中...
    胜浩_ae28阅读 5,137评论 0 23
  •   一个任务通常就是一个程序,每个运行中的程序就是一个进程。当一个程序运行时,内部可能包含了多个顺序执行流,每个顺...
    OmaiMoon阅读 1,704评论 0 12
  • 进程和线程 进程 所有运行中的任务通常对应一个进程,当一个程序进入内存运行时,即变成一个进程.进程是处于运行过程中...
    小徐andorid阅读 2,832评论 3 53
  • 本镜像采用官方原版app制作,集成Clover 4391,支持UEFI启动安装;如果卡+++请替换Drivers6...
    daliansky阅读 12,384评论 2 0
  • 大家好,我是侯俊玲,说实话看到这个主题时,我想了很久,我的梦想是什么?我好像是一只温水里的青蛙,跳不出这个圈子,消...
    侯俊玲阅读 427评论 0 0