Python中的队列Queue
我们在工作中有时需要将数据保存在内存里,但有时需要对保存的数据顺序有要求,我们一般采用有序字典,其实也可以使用内置队列解决,而队列都是线程安全的,更高效。
-
1. 线程Queue,也就是普通的Queue,模块
queue
-
2. 进程Queue,在多进程与多线程时使用,模块
from multiprocessing import Queue
Queue的种类
模块queue都有的一下方法
Queue().qsize() 返回队列的大小
Queue().empty() 如果队列为空,返回True,否则False,如果在多进程的异步中,不准
Queue().full() 如果队列满了,返回True,否则False,如果在多进程的异步中,不准
Queue.put(item,block=True,timeout=None) 向队列放入对象
- item :对象元素
- block:默认为True,代表队列满了就一直等待阻塞,False则为不阻塞,满了会直接主动抛出异常
- timeout: 默认为None,阻塞等待的时间,如何设置block为True,时间到了,还不能放,抛出异常
Queue().put_nowait(item) 放入元素,如果队列已满,则不等待直接抛出异常
Queue().get(block=True, timeout=None) 返回队列中的元素,队列中为空,则阻塞一直等待有值为止,设置了timeout则到时间还没有抛出异常
Queue().get_nowait() 队列为空时,如果没有直接抛异常,不等待
Queue().join() 阻塞调用线程,知道队列中的所有任务被处理掉
只要有数据被加入队列,未完成的任务数就会增加。当消费者线程调用task_done()(意味着有消费者取得任务并完成任务),未完成的任务数就会减少。当未完成的任务数降到0,join()解除阻塞
Queue().task_done()
意味着之前入队的一个任务已经完成。由队列的消费者线程调用。每一个get()调用得到一个任务,接下来的task_done()调用告诉队列该任务已经处理完毕。
如果当前一个join()正在阻塞,它将在队列中的所有任务都处理完时恢复执行(即每一个由put()调用入队的任务都有一个对应的task_done()调用)
FIFO
queue.Queue(maxsize=0)
first in first out ,先进先出。
maxsize
指定队列中能存放元素的上限数字,一旦达到上限,插入会导致阻塞,直到队列中的数据被消费掉。如果配置maxsize小于或为0时,代表队列大小无上限,默认为0。
from queue import Queue
# 创建队列对象
q = Queue(maxsize=0)
# 在队列尾部插入元素
q.put("0000")
q.put("1111")
q.put("2222")
print('LILO队列中所有的元素为:', q.queue) # deque([0, 1, 2])
# 返回并删除队列头部元素
print(q.get()) # 0
print(q.qsize()) # 2 获取队列的元素个数,因取出来一个,所以还剩下2
print(q.queue)
LIFO
-queue.Queue(maxsize=0)
last in first out ,后进先出,类似栈。
maxsize
同上。
from queue import LifoQueue
lifoQueue = LifoQueue()
lifoQueue.put(1)
lifoQueue.put(2)
lifoQueue.put(3)
print('LIFO队列', lifoQueue.queue)
# 返回并删除队列尾部元素
print(lifoQueue.get()) # 3
print(lifoQueue.get()) # 2
Priority
-queue.PriorityQueue(maxsize=0)
优先队列。
maxsize
同上。q.put((priority_number,item))
,默认priority_number越小,优先级越高,可以通过__lt__
来改变
import queue
import threading
class Job:
def __init__(self, priority, desc):
self.priority = priority
self.desc = desc
print("New Job:", desc)
return
def __eq__(self, other):
try:
return self.priority == other.priority
except AttributeError:
return NotImplemented
def __lt__(self, other):
try:
return self.priority > other.priority
except AttributeError:
return NotImplemented
def process_Job(q):
while True:
next_job = q.get()
print(next_job.desc)
q.task_done()
q = queue.PriorityQueue()
q.put(Job(5, "Five Job"))
q.put(Job(15, "Fifteen Job"))
q.put(Job(1, "One Job"))
workers = [
threading.Thread(target=process_Job, args=(q,)),
threading.Thread(target=process_Job, args=(q,)),
]
for work in workers:
work.setDaemon(True)
work.start()
q.join()
这里,我们默认数值越大优先级越高,可以看到15先执行,然后再是5,1任务。这个例子展现了有多个线程在处理任务时,要根据get()时队列中元素的优先级来处理。
https://blog.csdn.net/u013288190/article/details/128810536
双向队列
- deque
队列两端都可以删除和新增
from collections import deque
dequeQueue = deque(['Eric', 'John', 'Smith'])
print(dequeQueue)
dequeQueue.append('Tom') # 在右侧插入新元素
dequeQueue.appendleft('Terry') # 在左侧插入新元素
print(dequeQueue)
dequeQueue.rotate(2) # 循环右移2次
print('循环右移2次后的队列', dequeQueue)
dequeQueue.popleft() # 返回并删除队列最左端元素
print('删除最左端元素后的队列:', dequeQueue)
dequeQueue.pop() # 返回并删除队列最右端元素
print('删除最右端元素后的队列:', dequeQueue)
多进程与多线程使用queue
import multiprocessing as mp
import threading as td
import time
def job(q):
res = 0
for i in range(10000000):
res += i+i**2+i**3
q.put(res) # queue
def multicore():
q = mp.Queue()
p1 = mp.Process(target=job, args=(q,))
p2 = mp.Process(target=job, args=(q,))
p1.start()
p2.start()
p1.join()
p2.join()
res1 = q.get()
res2 = q.get()
print('multicore:' , res1+res2)
def normal():
res = 0
for _ in range(2):#线程或进程都构造了两个,进行了两次运算,所以这里循环两次
for i in range(10000000):
res += i+i**2+i**3
print('normal:', res)
def multithread():
q = mp.Queue()
t1 = td.Thread(target=job, args=(q,))
t2 = td.Thread(target=job, args=(q,))
t1.start()
t2.start()
t1.join()
t2.join()
res1 = q.get()
res2 = q.get()
print('multithread:', res1+res2)
if __name__ == '__main__':
st = time.time()
normal()
st1= time.time()
print('normal time:', st1 - st)
multithread()
st2 = time.time()
print('multithread time:', st2 - st1)
multicore()
print('multicore time:', time.time()-st2)
''' 运算结果如下
normal: 4999999666666716666660000000
normal time: 15.330998182296753
multithread: 4999999666666716666660000000
multithread time: 14.570075511932373
multicore: 4999999666666716666660000000
multicore time: 8.55050778388977
'''