下面我们来说一下java中的BlockingQueue。先来看一下BlockingQueue都有哪些方法
offer(E e): 将给定的元素设置到队列中,如果设置成功返回true, 否则返回false. e的值不能为空,否则抛出空指针异常。
offer(E e, long timeout, TimeUnit unit): 将给定元素在给定的时间内设置到队列中,如果设置成功返回true, 否则返回false.
add(E e): 将给定元素设置到队列中,如果设置成功返回true, 否则抛出异常。如果是往限定了长度的队列中设置值,推荐使用offer()方法。
put(E e): 将元素设置到队列中,如果队列中没有多余的空间,该方法会一直阻塞,直到队列中有多余的空间。
take(): 从队列中获取值,如果队列中没有值,线程会一直阻塞,直到队列中有值,并且该方法取得了该值。
poll(long timeout, TimeUnit unit): 在给定的时间里,从队列中获取值,如果没有取到会抛出异常。
remove(Object o): 从队列中移除指定的值。
contains(Object o): 判断队列中是否拥有该值。
put和take方法是阻塞的。下面我们就来看一下LinkedBlockingQueue是怎么实现的。
LinkedBlockingQueue是基于链表实现的,我们来看一下LinkedBlockingQueue都有哪些属性
/** The capacity bound, or Integer.MAX_VALUE if none */
private final int capacity;
/** Current number of elements */
private final AtomicInteger count = new AtomicInteger();
/**
* Head of linked list.
* Invariant: head.item == null
*/
transient Node<E> head;
/**
* Tail of linked list.
* Invariant: last.next == null
*/
private transient Node<E> last;
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
有一个head,保存首节点的引用,一个last,保存尾节点的引用,还有一个tabkLock和Condition,用来在take操作的时候阻塞线程的,一个putLock和Condition,用来在put操作的时候阻塞线程的。
LinkedBlockingQueue有一个内部类Node,有一个属性item,代表元素,一个Node元素的next,代表下一个节点
static class Node<E> {
E item;
/**
* One of:
* - the real successor Node
* - this Node, meaning the successor is head.next
* - null, meaning there is no successor (this is the last node)
*/
Node<E> next;
Node(E x) { item = x; }
}
下面我们来看一下put方法
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
/*
* Note that count is used in wait guard even though it is
* not protected by lock. This works because count can
* only decrease at this point (all other puts are shut
* out by lock), and we (or some other waiting put) are
* signalled if it ever changes from capacity. Similarly
* for all other uses of count in other wait guards.
*/
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
先将值放到Node节点中,然后上锁,如果当前队列的数量等于容量,则调用await方法阻塞等待,否则进行入队操作,然后如果队列的数量小于容量,则唤醒等待的线程继续执行,然后解锁,如果队列的数量为0,则唤醒take操作阻塞的线程
再来看一下take方法
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
先上锁,然后判断如果队列数量等于0,则阻塞当前线程,否则从队列中取出元素,如果队列的数量大于0,则唤醒被阻塞的线程继续执行,然后解锁,如果队列的数量等于容量-1,则唤醒put操作阻塞的线程
LinkedBlockingQueue就分析到这里了。