初始化
class Queue:
'''Create a queue object with a given maximum size.
If maxsize is <= 0, the queue size is infinite.
'''
def __init__(self, maxsize=0):
self.maxsize = maxsize
self._init(maxsize)
self.mutex = threading.Lock()
self.not_empty = threading.Condition(self.mutex)
self.not_full = threading.Condition(self.mutex)
self.all_tasks_done = threading.Condition(self.mutex)
self.unfinished_tasks = 0
def _init(self, maxsize):
self.queue = deque()
Queue的内部实现使用的deque, 也就是双链表.
然后我们来看内部monitor的实现, 使用lock来实现mutex, 然后使用三个condition variable实现进程调度, 也就是从block到ready状态的转化.
Queue Status
如果我们需要了解queue的状态, 那么我们需要互斥的访问底层的deque, 所以访问这些状态信息需要lock, 访问这些信息不需要调度线程, 所以我们不需要condition variable
def _qsize(self):
return len(self.queue)
def qsize(self):
with self.mutex:
return self._qsize()
def empty(self):
with self.mutex:
return not self._qsize()
def full(self):
with self.mutex:
return 0 < self.maxsize <= self._qsize()
Task Done
Indicate that a formerly enqueued task is complete.Used by Queue consumer threads.
每次consumer 从queue中获取job时, 我们是无法知道job的完成进度的, 而task done就是一个记录器, 每次consumer做完一个job, 它就会把这个job标记为task_done, 类似一个记录本
def task_done(self):
with self.all_tasks_done:
unfinished = self.unfinished_tasks - 1
if unfinished <= 0:
if unfinished < 0:
raise ValueError('task_done() called too many times')
self.all_tasks_done.notify_all()
self.unfinished_tasks = unfinished
task_done 只是很简单的减少unfinished_tasks的值, 关键的部分在于
self.all_tasks_done.notify_all():
The
notify_all()method wakes up all threads waiting for the condition variable.
那么哪些thread在等待这个信号呢?
def join(self):
'''Blocks until all items in the Queue have been gotten and processed.
When the count of unfinished tasks drops to zero, join() unblocks.
'''
with self.all_tasks_done:
while self.unfinished_tasks:
self.all_tasks_done.wait()
join 在 queue中表示所有的job都完成了. 而join一般是由父进程调用的, 用于阻塞父进程, 类似于thread中的join.
Producer
def put(self, item, block=True, timeout=None):
'''Put an item into the queue.
'''
with self.not_full:
if self.maxsize > 0:
if not block:
if self._qsize() >= self.maxsize:
raise Full
elif timeout is None:
while self._qsize() >= self.maxsize:
self.not_full.wait()
elif timeout < 0:
raise ValueError("'timeout' must be a non-negative number")
else:
endtime = time() + timeout
while self._qsize() >= self.maxsize:
remaining = endtime - time()
if remaining <= 0.0:
raise Full
self.not_full.wait(remaining)
self._put(item)
self.unfinished_tasks += 1
self.not_empty.notify()
Producer将job放入queue中, 首先进入临界区前需要加锁, 然后等待not_full信号直到timeout, 当收到信号时将job添加到queue中, 并更新unfinished_tasks的值. 同时发出not_empty信号唤醒consumer.
Consumer
def get(self, block=True, timeout=None):
'''Remove and return an item from the queue.
'''
with self.not_empty:
if not block:
if not self._qsize():
raise Empty
elif timeout is None:
while not self._qsize():
self.not_empty.wait()
elif timeout < 0:
raise ValueError("'timeout' must be a non-negative number")
else:
endtime = time() + timeout
while not self._qsize():
remaining = endtime - time()
if remaining <= 0.0:
raise Empty
self.not_empty.wait(remaining)
item = self._get()
self.not_full.notify()
return item
consumer将queue中的job删除, 进入临界区加锁, 然后等待producer的not_empty信号, 然后取出job并通知producer 在queue中有空位.