看这篇文章,如果想要理解其实现原理的话,可以先看我上一篇文章,ABQ的源码和实现原理的分析;
https://www.jianshu.com/p/4da0470f0b87
两个的实现原理是一样的,区别就是:
相同点:ArrayBlockingQueue和的LinkedBlockingQueue都是通过条件通知机制来实现可阻塞式插入和删除元素,并满足线程安全的特性;
不同点:1。ArrayBlockingQueue底层是采用的数组进行实现,而的LinkedBlockingQueue则是采用链表数据结构;
ArrayBlockingQueue插入和删除数据,只采用了一个锁,而的LinkedBlockingQueue的英文则在插入删除状语从句:采用分别了putLock状语从句:takeLock,这样可以降低线程由于线程无法获取到锁而进入WAITING状态的可能性,从而提高了线程并发执行的效率。
LinkedBlockingQueue是用链表实现的有界阻塞队列,当构造对象时为指定队列大小时,队列默认大小为Integer.MAX_VALUE。从它的构造方法可以看出
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE)//默认构造容量为int型的最大值队列
}
public LinkedBlockingQueue(int capacity) {
if (capacity <= o) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);//头指针和尾指针指向头节点(null)
}
public LinkedBlockingQueue(Collection<? extends E> c ) {
this(Integer.MAX_VALUE);
final ReentrantLock putLock = this.putLock;
putLock.lock();//这里和ArrayBlockingQueue也会获取锁,但它同样不是为了互斥操作
//,同样也是为了保证其可见性。
try {
int n = 0;
for (E e : c) {
if (e == null)
throw new NullPointerException();
if (n == capacity)
throw new IllegalStateException("Queue full");
enqueue(new Node<E>(e));//入队
++n;
}
count.set(n);
} finally {
putLock.unlock();
}
}
在第12行中获取锁是为了保证可见性,原因我认为是线程T1是实例化LinkedBlockingQueue对象,T2是对实例化的LinkedBlockingQueue对象做入队操作(当然要保证T1和T2的执行顺序),如果不对它进行加锁操作(加锁会保证其可见性,也就是写回主存),T1的集合c有可能只存在T1线程维护的缓存中,并没有写回主存,T2中实例化的LinkedBlockingQueue维护的缓存以及主存中并没有集合c,此时就因为可见性造成数据不一致的情况,引发线程安全问题。
然后来看下几个重要的属性:
//链表队列的最大长度
private final int capacity;
//链表队列的长度,数量
private final AtomicInteger count = new AtomicInteger();
//队列的头节点
transient Node<E> head;
//队列的尾节点
private transient Node<E> last;
//类似下面的生产线程
private final ReentrantLock takeLock = new ReentrantLock();
//类似消费者线程的作用。
private final ReentrantLock putLock = new ReentrantLock();
//可见LinkedBlockingQueue中有两个锁,一个是为了锁住入队操作,一个是为了锁住出队操作。
//而在ArrayBlockingQueue中则只有一个锁,同时锁住队列的入队、出队操作。
private final Condition notFull = putLock.newCondition();
private final Condition notEmpty = takeLock.newCondition();
可以看出与ArrayBlockingQueue主要的区别是,的LinkedBlockingQueue在插入数据和删除数据时分别是由两个不同的锁(takeLock和putLock)来控制线程安全的,因此,也由这两个锁定生成了两个对应的条件(notEmpty和notFull)来实现可阻塞的插入和删除数据并且,采用了链表的数据结构来实现队列,节点结点的定义.
入队列第一种处理:
add(e)//队列未满时,返回true;队列满则抛出IllegalStateException(“Queue full”)异常——AbstractQueue
//该方法在AbstractQueue中,子类(LBQ)通过重写offer方法,调用父类的add方法。
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
//LBQ重写的
public boolean offer(E e) {
//如果队列为null,则抛异常
if (e == null) throw new NullPointerException();
//为了保证可见性,count设置成局部变量AtomicInteger 原子类
final AtomicInteger count = this.count;
if (count.get() == capacity) //当数据量等于队列容量时,无法入队,返回false
return false;
int c = -1;
Node<E> node = new Node<E>(e); //封装成node
final ReentrantLock putLock = this.putLock; //插入锁
putLock.lock(); //获取锁,后面的获取失败会阻塞。
try {
if (count.get() < capacity) {
enqueue(node); //入队列
c = count.getAndIncrement(); #标注1 //使用CAS操作将队列数量的count加一。
//注意,这里返回的是自增前的值
if (c + 1 < capacity) //如果入队列后,队列当前数量仍然小于队列总容量,如果有入队列线程阻塞,则随机唤醒一个。
//执行入队列线程。
notFull.signal();
}
} finally {
putLock.unlock();
}
if (c == 0) //看上面#标注1,因为c等于自增前的操作,那么这里是第一次加入队列。
//这里可以这样理解,如果一个队列为null那么有出队列操作的话,会阻塞,只有当第一次开始
//入队列后,就唤醒该出队列阻塞线程。
signalNotEmpty();
return c >= 0;
}
//入队列
private void enqueue(Node<E> node) {
last = last.next = node;
}
//唤醒出队列阻塞线程。
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
入队列第二种处理方式
offer(e)//队列未满时,返回true;队列满时返回false。非阻塞立即返回。 见上面代码。
第三种处理方式
offer(e, time, unit)//设定等待的时间,如果在指定时间内还不能往队列中插入数据则返回false,插入成功返回true。
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (e == null) throw new NullPointerException();
long nanos = unit.toNanos(timeout);
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly(); //区别,可响应中断是否锁
try {
while (count.get() == capacity) { //如果队列满,则自旋
if (nanos <= 0) //计算时间,超时返回false
return false;
nanos = notFull.awaitNanos(nanos);
}
enqueue(new Node<E>(e)); //如果队列没有满,可以执行入队列
c = count.getAndIncrement(); //判断是否可以继续执行入队列线程的条件
if (c + 1 < capacity) #标注2
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0) //这里跟上面的处理意思一样。第一次入队列后,才可以唤醒出队列线程。
signalNotEmpty();
return true;
}
重点来了: 看代码代码#标注2,很有意思,如果当前队列在加一个后,队列还没有满,那么可以继续随机唤醒一个阻塞的入队列线程。
第四种处理
put(e)//队列未满时,直接插入没有返回值;队列满时会阻塞等待,一直等到队列未满时再插入。
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) { //区别:队列满,则阻塞
notFull.await();
}
enqueue(node); //队列不满了,则入队列
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
里面很多跟前面的处理思路是一样的。
关于出队列的四种阻塞处理
看第一种处理
//AbstractQueue#remove,这也是一个模板方法,定义删除队列元素的算法骨架,具体实现由子类来实现poll方法
public E remove() {
E x = poll();
if (x != null)
return x;
else
throw new NoSuchElementException();
}
//LBQ中重写
public E poll() {
final AtomicInteger count = this.count;
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock; //获取takeLock锁
takeLock.lock();
try {
if (count.get() > 0) { //如果队列不为null,则出队列
x = dequeue(); //出队列
c = count.getAndDecrement(); //count自减,返回自减前的值
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
if (c == capacity) //如果c(出队列前的值)等于当前队列的总容量,则表示队列已经不为满了。
//表示可以继续入队列,则唤醒入队列线程
signalNotFull();
return x;
}
//出队列操作,返回删除的头队列的值
private E dequeue() {
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
//唤醒入队列阻塞线程
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}
第二种阻塞处理
poll()//队列不为空时返回队首值并移除;队列为空时返回null。非阻塞立即返回。 见上面。
第三种阻塞处理
poll(time, unit)//设定等待的时间,如果在指定时间内队列还未孔则返回null,不为空则返回队首值 。
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x = null;
int c = -1;
long nanos = unit.toNanos(timeout);
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly(); //响应中断
try {
while (count.get() == 0) { //如果当前队列为null,,如果超时还未获取到值,则返回null
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
x = dequeue(); //出队列
c = count.getAndDecrement(); //这里返回CAS操作自减1,返回自减之前的值。
//如果出队列之前为队列数量的值大于1,
//则表示可以继续出队列,随机唤醒一个出队列线程
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity) //如果队列执行出队列之前为满,则唤醒入队列线程
signalNotFull();
return x;
}
第四种处理方式
take(e)//队列不为空返回队首值并移除;当队列为空时会阻塞等待,一直等到队列不为空时再返回队首值。
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count; //获取当前队列数量
final ReentrantLock takeLock = this.takeLock; //获取锁
takeLock.lockInterruptibly(); //响应中断
try {
while (count.get() == 0) { //如果当前队列为null
notEmpty.await();
}
x = dequeue(); //出队列
c = count.getAndDecrement(); //获取队列自减之前的数量
if (c > 1) //参考上面的逻辑
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity) //参考上面的逻辑
signalNotFull();
return x;
}
总结:
LBQ对比ABQ区别:
相同点:ArrayBlockingQueue和的LinkedBlockingQueue都是通过条件通知机制来实现可阻塞式插入和删除元素,并满足线程安全的特性;
不同点:1。ArrayBlockingQueue底层是采用的数组进行实现,而的LinkedBlockingQueue则是采用链表数据结构;
ArrayBlockingQueue插入和删除数据,只采用了一个锁,而的LinkedBlockingQueue分别针对入队列和出队列使用了两个锁。
可以降低线程由于线程无法获取到锁而进入WAITING状态的可能性,从而提高了线程并发执行的效率。
整理不易,喜欢点个赞