offer
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 插入到队列里面
q.offer(e);
// 如果入队的元素排在队首,说明该元素优先级最高,也就是最快过期的。
// 为什么一定要跟入队一致呢?
if (q.peek() == e) {
leader = null;
// 唤醒available,表示队列里面有东西可用了
available.signal();
}
return true;
} finally {
lock.unlock();
}
}
take
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 自旋
for (;;) {
// 拿到头节点
E first = q.peek();
if (first == null)
// 如果为空,一直等到头节点到来为止
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
// 如果已经超时,那么出栈头节点
if (delay <= 0)
return q.poll();
// 执行到这里说明队列不为空,且头节点并没有到期
first = null; // don't retain ref while waiting
// 如果leader线程存在,那么除了leader线程外,都无条件无限期等待available被唤醒
if (leader != null)
available.await();
else {
// 如果不存在leader,那么将当前线程设置为leader线程
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// 且等待头节点超时,好再次自旋后返回该头节点
available.awaitNanos(delay);
} finally {
// 这里大部分是头节点到期,被超时唤醒。如果leader没变,那么先将leader置空
// 因为再次自旋后会返回头节点,那么leader也就完成任务了。
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
// 到这里说明上个leader线程所等待的头节点已经返回,leader已经重置
// 这个时候还有其他线程被await中,leader完成任务后,需要去唤醒其他线程
// 被唤醒的线程会进入新的轮询之中,并将自己设置为新的leader,并等待头节点到期
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}
poll
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E first = q.peek();
// 如果头节点为空或头节点还没有到期,返回null
if (first == null || first.getDelay(NANOSECONDS) > 0)
return null;
else
return q.poll();
} finally {
lock.unlock();
}
}