LinkedBlockingQueue
- 基于链表实现的BlockingQueue,头尾使用不同的锁,吞吐量较高;
- 内部基于有界队列的逻辑队列,但如果创建时不指定大小,则会被设置为Integer.MAX_VALUE成为无界队列;
put方法:
不支持null对象的入队;
入队时候调用putLock的可中断获取锁版本;
count.get() == capacity 表示队列已经满了, 调用notFull.await();进入等待状态,利用while判断来避免虚假唤醒;
当满足条件时,enqueue,并对count进行++,如果count < capacity,则调用notFull.signal(); 这是因为不管有多少put线程等待,多个take线程或者多次take都只会在队列满的时候调用notFull.signal,只会唤醒一个线程,后续put线程的唤醒则依靠该处的notFull.singnal(),之所以这样做的原因在于,take中要调用notFull.signal()还需要获取putLock,这样就降低了效率;
-
最后如果put之前的count = 0,则可能会有take线程在等待,所以调用notEmpty来唤醒take线程,如果此处有多个take线程在等待,那么就会由其他的put线程(被唤醒的take操作在下一次put前完成了)或者take线程来唤醒(下一次put操作在take完成前完成,这里指对count的操作);
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); // Note: convention in all put/take/etc is to preset local var // holding count negative to indicate failure unless set. int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { /* * Note that count is used in wait guard even though it is * not protected by lock. This works because count can * only decrease at this point (all other puts are shut * out by lock), and we (or some other waiting put) are * signalled if it ever changes from capacity. Similarly * for all other uses of count in other wait guards. */ while (count.get() == capacity) { notFull.await(); } enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); }
take方法:
take方法与put方法内容相似,不一一分析;
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}