ArrayBlockingQueue和LinkedBlockingQueue的异同
1.队列大小有所不同,ArrayBlockingQueue是有界的初始化必须指定大小,而LinkedBlockingQueue可以是有界的也可以是无界的(Integer.MAX_VALUE),对于后者而言,当添加速度大于移除速度时,在无界的情况下,可能会造成内存溢出等问题。
2.数据存储容器不同,ArrayBlockingQueue采用的是数据作为数据存储容器,而LinkedBlockingQueue采用的则是以node节点为连接对象的链表。
3.由于ArrayBlockingQueue采用的是数组的存储容器,因此在插入或删除元素时不会产生或销毁任何额外的对象实例,而LinkedBlockingQueue则会生成一个额外的Node对象。这可能在长时间内需要高效并发地处理大批量数据时,对于GC可能存在很大的影响。
4.两者的实现队列添加或移除的锁不一样,ArrayBlockingQueue实现的队列中的锁是没有分离的,即添加和移除操作采用的同一个ReentrantLock锁,而LinkedBlockingQueue使用的锁是分离的,添加采用的是putLock,移除采用的是takeLock,这样大大提高了队列的吞吐量,意味着高并发场景下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。
1 LinkedBlockingQueue 参考
1.1linkedblockingqueue的基本概要
linkedblockingqueue是一个基于链表的阻塞队列,其内部维持一个基于链表的数据队列,实际上我们对LinkedBlockingQueue的API操作都是间接操作该内部数据队列。
1.2 LinkedBlockingQueue构造函数
队列默认大小是Integer.MAX_VALUE,所以在使用的时候建议手动传值,为其提供我们所需的大小,避免队列过大造成机器负载或内存饱满等情况。
三个构造函数:
第一种默认构造方法创建一个容量为 Integer.MAX_VALUE的 LinkedBlockingQueue实例。
第二种构造方法,指定了队列容量,首先判断指定容量是否大于零,否则抛出异常。然后为 capacity 赋值,最后创建空节点,并指向 head与 last,两者的 item与 next此时均为 null。
第三种,利用循环向队列中添加指定集合中的元素。
LinkedBlockingQueue是按照FIFO(先进先出)排列元素。
队列的头部元素是入队时间最长的元素,队列的尾部元素是在入队时间最短的元素。
队列执行获取操作会获得位于队列头部的元素,而新元素会被插入到队列的尾部。
1.3 内部成员
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
static class Node<E> {
E item;
Node<E> next;
Node(E x) { item = x; }
}
/** 阻塞队列的大小,默认为Integer.MAX_VALUE */
private final int capacity;
/** 当前阻塞队列中的元素个数 */
private final AtomicInteger count = new AtomicInteger();
/**阻塞队列的头结点*/
transient Node<E> head;
/**阻塞队列的尾节点*/
private transient Node<E> last;
/** 获取并移除元素时使用的锁,如take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** notEmpty条件对象,当队列没有数据时用于挂起执行删除的线程 */
private final Condition notEmpty = takeLock.newCondition();
/** 添加元素时使用的锁如 put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** notFull条件对象,当队列数据已满时用于挂起执行添加的线程 */
private final Condition notFull = putLock.newCondition();
}
正常情况下,LinkedBlockingQueue的吞吐量要高于基于数组的队列ArrayBlockingQueue,因为前者的添加和删除操作使用的两个显示锁(ReentrantLock)来控制并发执行,而ArrayBlockingQueue只是使用一个ReentranLock控制并发。
两个显示锁(ReenTrantLock)一个控制头部,相当于控制新增元素;一个控制尾部,相当于控制删除元素;可以理解为读写分离。
每个添加到LinkedBlockingQueue队列中的数据都将被封装成Node节点,添加到链表队列中,其中head和last分别指向队列的头结点和尾结点。与ArrayBlockingQueue不同的是,LinkedBlockingQueue内部分别使用了takeLock和putLock对并发进行控制,也就是说,添加和删除操作并不是互斥操作,可以同时进行,这样也就大大提高吞吐量。
注意:如果没有给LinkedBlockingQueue指定容量大小,其默认值为Integer.MAX_VALUE,如果存在添加速度大于删除速度的时候,有可能会内存溢出,这点在使用前希望慎重考虑。
LinkedBlockingQueue的实现原理图和ArrayBlockingQueue是类似的,除了对添加和移除方法使用单独的锁控制外,两者都使用了不同的Condition条件对象作为等待队列,用于挂起take线程和put线程。
1.4 非阻塞式添加元素:add、offer方法原理
add方法
间接调用offer方法,如果成功返回true,如果失败抛出IllegalStateException队列已满的异常。
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
offer实现
offer方法做了两件事:
1.第一件事判断队列是否满,满了就直接释放锁,没满就将节点封装成Node入队,然后再次判断队列添加完成后是否已满,不满就继续唤醒等到在条件对象notFull上的添加线程。
2.第二件事判断是否需要唤醒等到在notEmpty条件对象上的消费线程。
public boolean offer(E e) {
//添加元素为null直接抛出异常
if (e == null) throw new NullPointerException();
//获取队列的个数
final AtomicInteger count = this.count;
//判断队列是否已满
if (count.get() == capacity)
return false;
int c = -1;
//构建节点
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
if (count.get() < capacity) {
enqueue(node);//添加元素
c = count.getAndIncrement();//拿到当前未添加新元素时的队列长度
//如果容量还没满
if (c + 1 < capacity)
notFull.signal();//唤醒下一个添加线程,执行添加操作
}
} finally {
putLock.unlock();
}
// 由于存在添加锁和消费锁,而消费锁和添加锁都会持续唤醒等到线程,因此count肯定会变化。
//这里的if条件表示如果队列中还有1条数据
if (c == 0)
signalNotEmpty();//如果还存在数据那么就唤醒消费锁
return c >= 0; // 添加成功返回true,否则返回false
}
}
enqueue入队操作
//入队操作
private void enqueue(Node node) {
//队列尾节点指向新的node节点
last = last.next = node;
}
signalNotEmpty唤醒 删除线程(如消费者线程)
//signalNotEmpty方法
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
//唤醒获取并删除元素的线程
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
为什么添加完成后是继续唤醒在条件对象notFull上的添加线程,而不是像ArrayBlockingQueue那样直接唤醒notEmpty条件对象上的消费线程?
唤醒添加线程的原因,在添加新元素完成后,会判断队列是否已满,不满就继续唤醒在条件对象notFull上的添加线程,这点与前面分析ArrayBlockingQueue很不相同,在ArrayBlockingQueue内部完成添加操作后,会直接唤醒消费线程对元素进行获取,这是因为ArrayBlockingQueue只用了一个ReentrantLock同时对添加线程和消费线程进行控制,这样如果在添加完成后再次唤醒添加线程的话,消费线程可能永远无法执行,而对于LinkedBlockingQueue来说就不一样了,其内部对添加线程和消费线程分别使用了各自的ReentrantLock锁对并发进行控制,也就是说添加线程和消费线程是不会互斥的,所以添加锁只要管好自己的添加线程即可,添加线程自己直接唤醒自己的其他添加线程,如果没有等待的添加线程,直接结束了。如果有就直到队列元素已满才结束挂起,当然offer方法并不会挂起,而是直接结束,只有put方法才会当队列满时才执行挂起操作。注意消费线程的执行过程也是如此。
为什么判断if(c==0)时采取唤醒消费线程?
if (c == 0) //c拿到当前未添加新元素时的队列长度
signalNotEmpty();//如果还存在数据那么就唤醒消费锁
这是因为消费线程一旦被唤醒时一直在消费的(前提是有数据),所以C值一直在变化的,c值是添加完元素前队列的大小,此时c只可能是0或c>0,如果c=0,那么说明之前消费线程已停止,条件对象上可能存在等待的消费线程,添加完数据后应该是c+1,那么有数据就直接唤醒等待消费线程,如果没有就结束啦,等待下一次的消费操作。如果c>0那么消费线程就不会被唤醒,只能等待下一个消费操作(poll,take,remove)调用,那么为什么不是条件c>0采取唤醒呢?我们要明白的是消费线程一旦被唤醒会和添加线程一样,一直不断唤醒其他消费线程,如果添加前c>0,那么很可能上一次调用消费线程后,数据没有被消费完,条件队列上也就不存在等待的消费线程了,所以c>0唤醒消费线程的意义不是很大,当然如果添加线程一直添加元素,那么一直c>0,消费线程执行的换就要等待下一次调用消费操作了(poll,take,remove)。
1.5 阻塞式添加元素:put 方法原理
lockInterruptibly()方法能够中断等待获取锁的线程。当两个线程同时通过lock.lockInterruptibly()获取某个锁时,假若此时线程A获取到了锁,而线程B只有等待,那么对线程B调用threadB.interrupt()方法能够中断线程B的等待过程。参考
//将指定元素插入到此队列的尾部,如有必要,则等待空间变得可用
public void put(E e) throws InterruptedException {
//判断添加元素是否为null
if (e == null)
throw new NullPointerException();
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
//获取插入的可中断锁
putLock.lockInterruptibly();
try {
try {
//判断队列是否已满
while (count.get() == capacity)
//如果已满则阻塞添加线程
notFull.await();
} catch (InterruptedException ie) {
//失败就唤醒添加线程
notFull.signal();
throw ie;
}
//添加元素
insert(e);
//修改c值
c = count.getAndIncrement();
//根据c值判断队列是否已满
if (c + 1 < capacity)
//未满则唤醒添加线程
notFull.signal();
} finally {
//释放锁
putLock.unlock();
}
//c等于0代表添加成功
if (c == 0)
signalNotEmpty();
}
总结一下添加操作流程:
1.获取putLock锁
2.如果队列已满,阻塞添加线程(notFull.await())
3.元素入队
4.当前生产者添加元素之后如果队列还没有满,则通知其他生产者添加元素(notFull.signal())
5.释放putlock锁
6.如果队列中已经有元素,则通知消费者。
在添加元素时,如果队列已满,那么新到来的put线程将被添加到notFull条件等待队列中,具体如下图所示:
notFull 条件队列与putLock 显示锁关联,而不是与takeLock显示锁关联。putLock 显示锁负责对元素添加进行同步,具体的代码如下:
/** putLock显示锁 */
private final ReentrantLock putLock = new ReentrantLock();
/**putLock 条件队列与putLock显示锁关联 */
private final Condition notFull = putLock.newCondition();
1.6非阻塞式移除:poll方法原型
poll操作流程:
1.如果队列没有数据,就返回null
2.加takeLock锁。
3.如果有数据就poll方法取出来,取到之后,如果队列还有数据,那么唤醒等待在条件对象notEmpty上的消费线程。
4.释放takeLock锁。
5.判断c==capacity,如果为true则唤醒添加线程。
public E poll() {
//获取当前队列的大小
final AtomicInteger count = this.count;
if (count.get() == 0)//如果没有元素直接返回null
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
//判断队列是否有数据
if (count.get() > 0) {
//如果有,直接删除并获取该元素值
x = dequeue();
//当前队列大小减一
c = count.getAndDecrement();
//如果队列未空,继续唤醒等待在条件对象notEmpty上的消费线程
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
//判断c是否等于capacity,这是因为如果满说明NotFull条件对象上
//可能存在等待的添加线程
if (c == capacity)
signalNotFull();
return x;
}
dequeue从头部删除元素
private E dequeue() {
Node<E> h = head;//获取头结点
Node<E> first = h.next; 获取头结的下一个节点(要删除的节点)
h.next = h; // help GC//自己next指向自己,即被删除
head = first;//更新头结点
E x = first.item;//获取删除节点的值
first.item = null;//清空数据,因为first变成头结点是不能带数据的,这样也就删除队列的带数据的第一个节点
return x;
}
1.7阻塞移除元素:take方法原理
主要做了两件事:
1.如果队列中没有数据就挂起当前线程到notEmpty条件对象的等待队列中一直等待,如果有数据就删除节点并返回数据项,同时唤醒后续消费线程。
2.尝试唤醒条件对象notFull上等待队列中的添加线程。
public E take() throws InterruptedException {
E x;
int c = -1;
//获取当前队列大小
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();//可中断
try {
//如果队列没有数据,挂机当前线程到条件对象的等待队列中
while (count.get() == 0) {
notEmpty.await();
}
//如果存在数据直接删除并返回该数据
x = dequeue();
c = count.getAndDecrement();//队列大小减1
if (c > 1)
notEmpty.signal();//还有数据就唤醒后续的消费线程
} finally {
takeLock.unlock();
}
//满足条件,唤醒条件对象上等待队列中的添加线程
if (c == capacity)
signalNotFull();
return x;
}
1.8提取元素:peek和element
public E element() {
E x = peek();//直接调用peek
if (x != null)
return x;
else
throw new NoSuchElementException();//没数据抛异常
}
peek方法从头结点直接就可以获取到第一添加的元素,所以效率是比较高的,如果不存在则返回null。
//获取但不移除此队列的头;如果此队列为空,则返回 null
public E peek() {
//判断元素数是否为0
if (count.get() == 0)
return null;
final ReentrantLock takeLock = this.takeLock;
//获取获取锁
takeLock.lock();
try {
//头节点的 next节点即为添加的第一个节点
Node<E> first = head.next;
//如果不为空则返回该节点
if (first == null)
return null;
else
return first.item;
} finally {
//释放锁
takeLock.unlock();
}
}
从代码来看,head头结节点在初始化时是本身不带数据的,仅仅作为头部head方便我们执行链表的相关操作。peek返回直接获取头结点的下一个节点返回其值,如果没有值就返回null,有值就返回节点对应的值。
1.9 移除元素 remove的实现原理
public boolean remove(Object o) {
if (o == null) return false;
fullyLock();//同时对putLock和takeLock加锁
try {
//循环查找要删除的元素
for (Node<E> trail = head, p = trail.next;p != null;trail = p, p = p.next) {
if (o.equals(p.item)) {//找到要删除的节点
unlink(p, trail);//直接删除
return true;
}
}
return false;
} finally {
fullyUnlock();//解锁
}
}
//两个同时加锁
void fullyLock() {
putLock.lock();
takeLock.lock();
}
void fullyUnlock() {
takeLock.unlock();
putLock.unlock();
}
remove方法删除指定的对象,这里我们可能会诧异,为什么同时对putLock和takeLock加锁?这是因为remove方法删除的数据的位置不确定,为了避免造成并非安全问题,所以需要对2个锁同时加锁。
2 ArrayBlockingQueue
ArrayBlockingQueue采用的是循环数组的形式表达队列,所以在该类中不存在扩容的情况,对于数组的长度来说,根据初始化的参数为标准,类中没有默认的数组长度。
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {
final Object[] items;//存放元素的数组
int takeIndex;//记录元素被取出元素的数组下标
int putIndex;//记录元素被放入元素的数组下标
int count;//记录元素的个数
final ReentrantLock lock;//锁,同样也是保证多线程安全的一个重要因素
private final Condition notEmpty;
//notEmpty是当前lock的阻塞队列
//作用就是采用内部的一个Condition队列来存储想通过put进行添加元素,但由于数组已满而被阻塞的线程。
private final Condition notFull;
//notFull是当前lock的另一个阻塞队列
//作用就是采用内部的Condition队列来存储想通过take进行取出元素,但由于数组为空而被阻塞的线程。
transient Itrs itrs = null;
}
int count 这个成员是用来记录数组中当前拥有的元素的个数,那么通过添加和取出都要对这个变量进行操作,而这个成员并没有采用volitile进行修饰。但是该类采用了一个ReentrantLock lock,那我们就可以得出结论,这个类在所有的方法上都采用lock进行多线程安全的保证。所以对于count来说可以不添加任何有关多线程安全的修饰。
poll()
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();//这里的lock就是ArrayBlockingQueue类中唯一的锁lock。
try {
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
offer(E e)
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
return false;
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
put(E e)
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
take()
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
既然说到了阻塞线程需要被唤醒,何时被唤醒?
对于put来说,表明数组中没有位置,所以无法put,只能阻塞,当一旦有poll操作的时候,poll方法会调用一个dequeue方法这个方法就是将数组中最老的元素进行取出,关键是在最后有一句 notFull.signal();
采用这句进行唤醒在notFull阻塞队列中的线程,被唤醒的线程加入到lock的等待队列中,等待机会执行。
private E dequeue() {
final Object[] items = this.items;
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return x;
}
对于take来说,数组为空时,无法获取,进行阻塞,当一旦有offer操作的时候,offer操作会调用enqueue()方法,进行元素的添加,在最后进行notEmpty.signal();唤醒等待取操作的线程,被唤醒的线程加入到lock的等待队列中,等待机会执行。
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();
}