生产者和消费者模型-队列

队列(进程通信ipc)

队列主要用于解决进程间通信的问题,队列底层就是通过管道和锁的方式实现的。

代码示例:

from multiprocessing import Queue
import time

q=Queue(3)    # 指定队列的长度

#队列相关的操作方法
# put,get,put_nowait,get_nowait,full,empty
q.put(3)      # 向队列中存放数据,可以是任何类型的数据
q.put(3)
q.put(3)
print(q.full())   # 如果队列满,则返回 True, 否则返回 False

print(q.get())    # 依次取出数据
print(q.get())
print(q.get())
print(q.empty())  # 如果队列为空,则返回True,否则返回 False

主要方法

  • q.put(): 用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常
  • q.get方法可以从队列读取并且删除一个元素。同样,get方法有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常
  • q.get_nowait():同q.get(False)
  • q.put_nowait():同q.put(False)
  • q.empty():调用此方法时q为空则返回True,该结果不可靠,比如在返回True的过程中,如果队列中又加入了项目
  • q.full():调用此方法时q已满则返回True,该结果不可靠,比如在返回True的过程中,如果队列中的项目被取走。
  • q.qsize():返回队列中目前项目的正确数量,结果也不可靠,理由同q.empty()和q.full()一样

线程Queue

同进程队列一样,线程也有对于的方法,叫做线程Queue.

import queue

q=queue.Queue(3)   # 队列:先进先出,指定队列的大小

q.put(1)  # 向队列中放入数据
q.put(2)
q.put(3)
print(q.get()) # 从队列中取出数据
# q.put(4)    # 当队列满后会等待有空闲位置时再放入
# q.put_nowait(4)  # 立即放入数据,不等待,如果队列已经满,则会报错。

q.put(4,block=False) # 与put_nowait()方法一样,设置不等待,直接放入
q.put(4,block=True,timeout=3)   # 等待,且超时时间为3s


优先级队列:

import queue

q=queue.PriorityQueue(3)  # 优先级队列

q.put((10,'a')) # 指定优先级,数字越小,优先级越高
q.put((-3,'b'))
q.put((100,'c'))

print(q.get())
print(q.get())
print(q.get())

# 输出结果:
(-3, 'b')
(10, 'a')
(100, 'c')

堆栈,后进先出:

import queue

q=queue.LifoQueue(3) # 堆栈:后进先出
q.put(1)
q.put(2)
q.put(3)

print(q.get())
print(q.get())
print(q.get())

输出:

3
2
1

生产者和消费者模型

为了避免死锁问题,能够解耦合,定义了生产者消费者模型。生产者只需要创造数据,然后将数据放入队列,消费者则从队列中取出数据,对数据进行消费。
下面是使用多进程实现了简单的生产者和消费者模型:

from multiprocessing import Process,Queue
import random
import time

def producer(name,food,q):
    for i in range(10):
        res='%s%s' %(food,i)
        time.sleep(random.randint(1,3))
        q.put(res)
        print("厨师[%s]生产了<%s>" %(name,res))

def consumer(name,q):
    while True:
        res=q.get()
        time.sleep(random.randint(1,3))
        print('吃货[%s]吃了<%s>' %(name,res))

if __name__=='__main__':
    q=Queue()
    p1=Process(target=producer,args=('andy','包子',q))
    c1=Process(target=consumer,args=('bob',q))

    p1.start()
    c1.start()
    print('主进程')

在实际的应用中,可能会有多个生产者和消费者,而且我们必须保证在生产者已经生产完数据,并且消费者消费完数据后程序正常退出,所以这里需要使用到JoinableQueue模块。

from multiprocessing import Process,JoinableQueue   # 导入可以使用join方法的模块
import random
import time

def producer(name,food,q):
    for i in range(3):
        res='%s%s' %(food,i)
        time.sleep(random.randint(1,3))
        q.put(res)
        print("厨师[%s]生产了<%s>" %(name,res))

def consumer(name,q):
    while True:
        res=q.get()
        time.sleep(random.randint(1,3))
        print('吃货[%s]吃了<%s>' %(name,res))
        q.task_done()    # 通过使用队列的task_done方法,通知每一次从队列取出的信息

if __name__=='__main__':
    q=JoinableQueue()
    p1=Process(target=producer,args=('andy','包子',q))
    p2=Process(target=producer,args=('Tom','包子',q))

    c1=Process(target=consumer,args=('bob',q))
    c2=Process(target=consumer,args=('Lucy',q))
    c3=Process(target=consumer,args=('David',q))
    c1.daemon=True     # 设置为守护进程,当主进程运行完毕时,此子进程也退出
    c2.daemon=True
    c3.daemon=True

    p1.start()
    p2.start()
    c1.start()
    c2.start()
    c3.start()

    p1.join()     # 等待生产子进程运行结束
    p2.join()
    q.join()      # 等待队列为空 后结束主进程
    print('主进程')



说明:

  • JoinableQueue([maxsize]):这就像是一个Queue对象,但队列允许项目的使用者通知生成者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。
  • q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。如果调用此方法的次数大于从队列中删除项目的数量,将引发ValueError异常
  • q.join():生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。阻塞将持续到队列中的每个项目均调用q.task_done()方法为止
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 211,948评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,371评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,490评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,521评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,627评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,842评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,997评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,741评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,203评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,534评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,673评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,339评论 4 330
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,955评论 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,770评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,000评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,394评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,562评论 2 349

推荐阅读更多精彩内容

  • 队列 进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing模块支持两种形式:队列和管道...
    knot98阅读 275评论 0 0
  • 进程间通信——队列和管道(multiprocess.Queue、multiprocess.Pipe) 进程间通信 ...
    go以恒阅读 1,777评论 0 3
  • 顾名思义,进程即正在执行的一个过程。进程是对正在运行程序的一个抽象。进程的概念起源于操作系统,是操作系统最核心的概...
    SlashBoyMr_wang阅读 1,129评论 0 2
  • 什么是进程 进程(Process)是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单...
    可笑的黑耀斑阅读 1,000评论 0 0
  • @(python)[笔记] 目录 一、什么是进程 1.1 进程的概念 进程的概念起源于操作系统,是操作系统最核心的...
    CaiGuangyin阅读 1,254评论 0 9