python多进程下的生产者和消费者模型

https://blog.csdn.net/qq_36441027/article/details/105929246

一、生产者消费者模型介绍

1.1 为什么需要使用生产者消费者模型

生产者是指生产数据的任务,消费者是指消费数据的任务。当生产者的生产能力远大于消费者的消费能力,生产者就需要等消费者消费完才能继续生产新的数据,同理,如果消费者的消费能力远大于生产者的生产能力,消费者就需要等生产者生产完数据才能继续消费,这种等待会造成效率的低下,为了解决这种问题就引入了生产者消费者模型。

1.2 如何实现生产者消费者模型

进程间引入队列可以实现生产者消费者模型,通过使用队列无需考虑锁的概念,因为进程间的通信是通过队列来实现的;

生产者生产的数据往队列里面写,消费者消费数据直接从队列里面取,这样就对实现了生产者和消费者之间的解耦。

生产者 -- > 队列 <--消费者

二、Queue实现生产者消费者模型

2.1 消费者生产者模型代码

from multiprocessing import Process, Queue
import time
 
# 消费者方法
def consumer(q, name):
    while True:
        res = q.get()
        # if res is None: break
        print("%s 吃了 %s" % (name, res))
 
# 生产者方法
def producer(q, name, food):
    for i in range(3):
        time.sleep(1)  # 模拟生产西瓜的时间延迟
        res = "%s %s" % (food, i)
        print("%s 生产了 %s" % (name, res))
        # 把生产的vegetable放入到队列中
        q.put(res)
 
if __name__ == "__main__":
    #创建队列
    q = Queue()
    # 创建生产者
    p1 = Process(target=producer, args=(q, "kelly", "西瓜"))
    c1 = Process(target=consumer, args=(q, "peter",))
    p1.start()
    c1.start()
 
    # p1.join()
    # q.put(None)
    print("主进程")

2.2 执行结果

2.2.1 直接执行上面的代码的结果

直接执行会出现一个问题就是生产者生产完了,没有向消费者发送一个停止的信号,所以消费者一直会一直阻塞在q.get(),导致程序无法退出。


为了解决上面的问题,让消费者消费完了生产者的数据之后自动退出,就需要在生产者进程介绍的时候往队列里面put一个结束信号,消费者拿到这个信号,就退出消费进程。

主要是两个地方修改 ,把下方代码的注释打开就可以实现消费者消费完接收到生产者的结束信号就退出消费者进程了。

def consumer():
    if res is None: break
 
if __name__ == "__main__":
p1.join()  
q.put(None)

2.2.2 把注释打开后的运行结果

把注释打开后,消费者拿到了生产者发送的结束信号,可以正常退出程序了。


但如果有n个消费者,就需要发送n个结束信号,这种方式就不是那么简洁,像下面的代码这样:

from multiprocessing import Process, Queue
import time
 
 
# 消费者方法
def consumer(q, name):
    while True:
        res = q.get()
        if res is None: break
        print("%s 吃了 %s" % (name, res))
 
 
# 生产者方法
def producer(q, name, food):
    for i in range(3):
        time.sleep(1)  # 模拟生产西瓜的时间延迟
        res = "%s %s" % (food, i)
        print("%s 生产了 %s" % (name, res))
        # 把生产的vegetable放入到队列中
        q.put(res)
 
 
if __name__ == "__main__":
    # 创建队列
    q = Queue()
    # 创建生产者
    p1 = Process(target=producer, args=(q, "kelly", "西瓜"))
    p2 = Process(target=producer, args=(q, "kelly2", "香蕉"))
    c1 = Process(target=consumer, args=(q, "peter",))
    c2 = Process(target=consumer, args=(q, "peter2",))
    c3 = Process(target=consumer, args=(q, "peter3",))
    p1.start()
    p2.start()
    c1.start()
    c2.start()
    c3.start()
 
    p1.join()
    p2.join()
    q.put(None)
    q.put(None)
    q.put(None)
    print("主进程")

其实我们现在就是生产者生产完数据之后想往队列里面发送一个结束信号,python语言提供了另外一种队列JoinableQueue([maxsize])来解决这种问题

三、JoinableQueue实现生产者消费者模型

3.1 JoinableQueue方法介绍

JoinableQueue([maxsize]) : A queue type which also supports join() and task_done() methods

q.task_done():消费者使用此方法发出信号,表示q.get()的返回项目已经被处理。
q.join():生产者调用此方法进行阻塞,直到队列中所有的项目均被处理;阻塞将持续到队列中的每个项目均调用q.task_done()方法为止。

3.2 JoinableQueue实现生产者消费者模型源码

from multiprocessing import Process,JoinableQueue
import time
 
 
# 消费者方法
def consumer(q, name):
    while True:
        res = q.get()
        if res is None: break
        print("%s 吃了 %s" % (name, res))
        q.task_done()  # 发送信号给q.join(),表示已经从队列中取走一个值并处理完毕了
 
 
# 生产者方法
def producer(q, name, food):
    for i in range(3):
        time.sleep(1)  # 模拟生产西瓜的时间延迟
        res = "%s %s" % (food, i)
        print("%s 生产了 %s" % (name, res))
        # 把生产的vegetable放入到队列中
        q.put(res)
    q.join()  # 等消费者把自己放入队列的所有元素取完之后才结束
 
 
if __name__ == "__main__":
    # q = Queue()
    q = JoinableQueue()
    # 创建生产者
    p1 = Process(target=producer, args=(q, "kelly", "西瓜"))
    p2 = Process(target=producer, args=(q, "kelly2", "蓝莓"))
    # 创建消费者
    c1 = Process(target=consumer, args=(q, "peter",))
    c2 = Process(target=consumer, args=(q, "peter2",))
    c3 = Process(target=consumer, args=(q, "peter3",))
 
    c1.daemon = True
    c2.daemon = True
    c3.daemon = True
 
    p_l = [p1, p2, c1, c2, c3]
    for p in p_l:
        p.start()
    
    p1.join()
    p2.join()
    # 1.主进程等待p1,p2进程结束才继续执行
    # 2.由于q.join()的存在,生产者只有等队列中的元素被消费完才会结束
    # 3.生产者结束了,就代表消费者已经消费完了,也可以结束了,所以可以把消费者设置为守护进程(随着主进程的退出而退出)
 
    print("主进程")

3.3 运行结果

通过运行结果可以看出,生产者没有手动发送结束信号给消费者,而是通过JoinableQueue队列的方式也实现了生产者消费者模型。

原文链接:https://blog.csdn.net/qq_36441027/article/details/105929246

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,456评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,370评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,337评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,583评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,596评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,572评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,936评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,595评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,850评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,601评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,685评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,371评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,951评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,934评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,167评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,636评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,411评论 2 342

推荐阅读更多精彩内容