生产者和消费者模型-队列

队列(进程通信ipc)

队列主要用于解决进程间通信的问题,队列底层就是通过管道和锁的方式实现的。

代码示例:

from multiprocessing import Queue
import time

q=Queue(3)    # 指定队列的长度

#队列相关的操作方法
# put,get,put_nowait,get_nowait,full,empty
q.put(3)      # 向队列中存放数据,可以是任何类型的数据
q.put(3)
q.put(3)
print(q.full())   # 如果队列满,则返回 True, 否则返回 False

print(q.get())    # 依次取出数据
print(q.get())
print(q.get())
print(q.empty())  # 如果队列为空,则返回True,否则返回 False

主要方法

  • q.put(): 用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常
  • q.get方法可以从队列读取并且删除一个元素。同样,get方法有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常
  • q.get_nowait():同q.get(False)
  • q.put_nowait():同q.put(False)
  • q.empty():调用此方法时q为空则返回True,该结果不可靠,比如在返回True的过程中,如果队列中又加入了项目
  • q.full():调用此方法时q已满则返回True,该结果不可靠,比如在返回True的过程中,如果队列中的项目被取走。
  • q.qsize():返回队列中目前项目的正确数量,结果也不可靠,理由同q.empty()和q.full()一样

线程Queue

同进程队列一样,线程也有对于的方法,叫做线程Queue.

import queue

q=queue.Queue(3)   # 队列:先进先出,指定队列的大小

q.put(1)  # 向队列中放入数据
q.put(2)
q.put(3)
print(q.get()) # 从队列中取出数据
# q.put(4)    # 当队列满后会等待有空闲位置时再放入
# q.put_nowait(4)  # 立即放入数据,不等待,如果队列已经满,则会报错。

q.put(4,block=False) # 与put_nowait()方法一样,设置不等待,直接放入
q.put(4,block=True,timeout=3)   # 等待,且超时时间为3s


优先级队列:

import queue

q=queue.PriorityQueue(3)  # 优先级队列

q.put((10,'a')) # 指定优先级,数字越小,优先级越高
q.put((-3,'b'))
q.put((100,'c'))

print(q.get())
print(q.get())
print(q.get())

# 输出结果:
(-3, 'b')
(10, 'a')
(100, 'c')

堆栈,后进先出:

import queue

q=queue.LifoQueue(3) # 堆栈:后进先出
q.put(1)
q.put(2)
q.put(3)

print(q.get())
print(q.get())
print(q.get())

输出:

3
2
1

生产者和消费者模型

为了避免死锁问题,能够解耦合,定义了生产者消费者模型。生产者只需要创造数据,然后将数据放入队列,消费者则从队列中取出数据,对数据进行消费。
下面是使用多进程实现了简单的生产者和消费者模型:

from multiprocessing import Process,Queue
import random
import time

def producer(name,food,q):
    for i in range(10):
        res='%s%s' %(food,i)
        time.sleep(random.randint(1,3))
        q.put(res)
        print("厨师[%s]生产了<%s>" %(name,res))

def consumer(name,q):
    while True:
        res=q.get()
        time.sleep(random.randint(1,3))
        print('吃货[%s]吃了<%s>' %(name,res))

if __name__=='__main__':
    q=Queue()
    p1=Process(target=producer,args=('andy','包子',q))
    c1=Process(target=consumer,args=('bob',q))

    p1.start()
    c1.start()
    print('主进程')

在实际的应用中,可能会有多个生产者和消费者,而且我们必须保证在生产者已经生产完数据,并且消费者消费完数据后程序正常退出,所以这里需要使用到JoinableQueue模块。

from multiprocessing import Process,JoinableQueue   # 导入可以使用join方法的模块
import random
import time

def producer(name,food,q):
    for i in range(3):
        res='%s%s' %(food,i)
        time.sleep(random.randint(1,3))
        q.put(res)
        print("厨师[%s]生产了<%s>" %(name,res))

def consumer(name,q):
    while True:
        res=q.get()
        time.sleep(random.randint(1,3))
        print('吃货[%s]吃了<%s>' %(name,res))
        q.task_done()    # 通过使用队列的task_done方法,通知每一次从队列取出的信息

if __name__=='__main__':
    q=JoinableQueue()
    p1=Process(target=producer,args=('andy','包子',q))
    p2=Process(target=producer,args=('Tom','包子',q))

    c1=Process(target=consumer,args=('bob',q))
    c2=Process(target=consumer,args=('Lucy',q))
    c3=Process(target=consumer,args=('David',q))
    c1.daemon=True     # 设置为守护进程,当主进程运行完毕时,此子进程也退出
    c2.daemon=True
    c3.daemon=True

    p1.start()
    p2.start()
    c1.start()
    c2.start()
    c3.start()

    p1.join()     # 等待生产子进程运行结束
    p2.join()
    q.join()      # 等待队列为空 后结束主进程
    print('主进程')



说明:

  • JoinableQueue([maxsize]):这就像是一个Queue对象,但队列允许项目的使用者通知生成者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。
  • q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。如果调用此方法的次数大于从队列中删除项目的数量,将引发ValueError异常
  • q.join():生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。阻塞将持续到队列中的每个项目均调用q.task_done()方法为止
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • 队列 进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing模块支持两种形式:队列和管道...
    knot98阅读 280评论 0 0
  • 进程间通信——队列和管道(multiprocess.Queue、multiprocess.Pipe) 进程间通信 ...
    go以恒阅读 1,809评论 0 3
  • 顾名思义,进程即正在执行的一个过程。进程是对正在运行程序的一个抽象。进程的概念起源于操作系统,是操作系统最核心的概...
    SlashBoyMr_wang阅读 1,179评论 0 3
  • 什么是进程 进程(Process)是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单...
    可笑的黑耀斑阅读 1,028评论 0 0
  • @(python)[笔记] 目录 一、什么是进程 1.1 进程的概念 进程的概念起源于操作系统,是操作系统最核心的...
    CaiGuangyin阅读 1,281评论 0 9