本文作者:王一飞,叩丁狼高级讲师。原创文章,转载请注明出处。
概念
LinkedBlockingQueue按照api解释:一个基于链表而实现的有界阻塞队列。遵循先进先出原则,由队头入列,再从队尾出列。具体操作上跟ArrayBlockingQueue类似,区别在于底层维护数据上,LinkedBlockingQueue底层是一个链接,而ArrayBlockingQueue是一个数组。
内部结构
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
private final AtomicInteger count = new AtomicInteger(); //队列元素个数
private final int capacity; //队列容器
transient Node<E> head; //队头
private transient Node<E> last; //队尾
//出列入列过程中维护现场安全的各类锁
private final ReentrantLock takeLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition();
private final ReentrantLock putLock = new ReentrantLock();
private final Condition notFull = putLock.newCondition();
//队列数据节点
static class Node<E> {
E item;
Node<E> next;
Node(E x) { item = x; }
}
}
基本操作
public class App {
public static void main(String[] args) throws InterruptedException {
LinkedBlockingQueue<String> queue = new LinkedBlockingQueue(5);
//入列
queue.add("a"); //队列满后抛异常
queue.put("b");//队列满后阻塞
queue.offer("c"); //入列失败返回false
System.out.println(queue);
queue.put("a");
queue.put("b");
queue.put("c");
queue.put("d");
queue.put("e");
//出列
queue.remove("a"); //删除指定元素
queue.poll(); //出列,如果队列为空返回null
queue.take(); //队列为空,阻塞等待
System.out.println(queue);
}
}
一般推荐使用put入列, take出列
源码解析
构造-LinkedBlockingQueue提供了3个构造器,无参, 带有容量,带集合数据
//无参数时,默认容量为int的最大值
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
//带容量参数【推荐】
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
//初始化队头,队尾
last = head = new Node<E>(null);
}
//带数据集合
public LinkedBlockingQueue(Collection<? extends E> c) {
this(Integer.MAX_VALUE); //容量为int最大值
final ReentrantLock putLock = this.putLock;
putLock.lock(); //谨慎起见也加锁,需要将传入集合逐一入列
try {
int n = 0;
for (E e : c) {
if (e == null)
throw new NullPointerException();
if (n == capacity)
throw new IllegalStateException("Queue full");
enqueue(new Node<E>(e));
++n;
}
count.set(n);
} finally {
putLock.unlock();
}
}
LinkedBlockingQueue 3个构造器,实际使用中更推荐使用指定容量的队列。
在继续看源码前,先了解一个原子操作类:AtomicInteger
//int 类型的原子操作,不指定初始为0
//底层维护了一个volatile 修饰变量 value = 0
AtomicInteger atomicInteger = new AtomicInteger();
//获取:value = 0
System.out.println(atomicInteger.get()); //0
//返回value值0, 然后value 值加一【这里也是原子操作】
System.out.println(atomicInteger.getAndIncrement()); //0
//返回value值1, 然后value 值加一【这里也是原子操作】
System.out.println(atomicInteger.getAndIncrement()); //1
//获取:value=2
System.out.println(atomicInteger.get()); //2
//int 类型的原子操作,不指定初始为0
//底层维护了一个volatile 修饰变量 value = 0
AtomicInteger atomicInteger = new AtomicInteger();
//获取:value = 0
System.out.println(atomicInteger.get()); //0
//返回value值0, 然后value 值减一【这里也是原子操作】
System.out.println(atomicInteger.getAndDecrement()); //0
//返回value值1, 然后value 值减一【这里也是原子操作】
System.out.println(atomicInteger.getAndDecrement()); //-1
//返回value值1
System.out.println(atomicInteger.get()); //-2
getAndIncrement : 返回未操作前value 的值, 然后加1
getAndDecrement : 返回未操作前value 的值, 然后减1
入列-put:将入列数据添加到队尾,如果队列满了,阻塞等待。
public void put(E e) throws InterruptedException {
//入列元素不允许为null
if (e == null) throw new NullPointerException();
//队列临时容量缓存,作为执行唤醒/阻塞线程操作标记
int c = -1;
Node<E> node = new Node<E>(e);
//入列锁
final ReentrantLock putLock = this.putLock;
//队列元素个数
final AtomicInteger count = this.count;
putLock.lockInterruptibly(); //入列前加锁,可中断锁
try {
//自旋排除硬件加锁延时问题
//如果队列已满,线程阻塞等待
while (count.get() == capacity) {
notFull.await();
}
//数据入列
enqueue(node);
//原子操作,入列后再获取队列元素个数,并+1,确保当前操作队列元素个数最新
c = count.getAndIncrement();
//c + 1 < capacity 表示队列未满,仍可添加,唤醒未持锁而等待的入列线程
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock(); //释放锁
}
//c == 0 说明队列为空,唤醒入列线程入列
if (c == 0)
signalNotEmpty();
}
从上面源码上看put方法其实做5件事:
1>判断元素是否null, 为null 抛异常
2>判断是否满列,满列则等待,此处需要留意while这个操作。
理想请求下,if即可,但是有种情况,如果jvm执行指令传到cpu到程序时间片执行存在一点的时间延时,while 重复执行,可以减少延时影响。
3>数据入列
4>入列后需要唤醒未持锁而等待的入列线程
5>c==0的判断, c的值是入列前容量值,如果为0说明入列前,队列为空,可以存在出列等待线程,所以在c==0的时候,且已经入列成功,可以唤醒出列等待线程,让其顺利出列。
出列-take : 出列,从队头弹出元素, 如果队列个数为0, 阻塞等待。
public E take() throws InterruptedException {
E x;
//队列临时容量缓存,作为执行唤醒/阻塞线程操作标记
int c = -1;
//队列元素个数
final AtomicInteger count = this.count;
//出列锁
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();//出列前加锁,可中断锁
try {
//自旋排除硬件加锁延时问题
//如果队列已空,线程阻塞等待
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue(); //数据出列
//原子操作,出列后再获取队列元素个数,并-1,确保当前操作队列元素个数最新
c = count.getAndDecrement();
//c > 1 表示队列未空,仍可出列,唤醒未持锁而等待的出列线程
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();//释放锁
}
//c == capacity 说明队列已满,唤醒出列线程出列
if (c == capacity)
signalNotFull();
return x;
}
上面源码看出,take操作跟put原理差不多,执行的是反向操作而已。需要注意的是take方法唤醒线程条件c 变量值判断条件。
1> 出列成功之后, c > 1 表示出列前队列至少有2个元素,所以出列成功后,唤醒未持锁而等待的出列线程
2>c == capacity 满足这个条件, 表示出列前,队列是满的,可能存在入列等待线程,出列成功之后,解除等待,唤醒入列等待线程。
删除-remove : 根据指定元素删除队列中的元素,如果有删除,如果没有返回false
public boolean remove(Object o) {
if (o == null) return false; //参数为null 直接返回
fullyLock(); //为保证入列出列线程安全,加双锁
try {
//循环遍历列表,删除指定元素
for (Node<E> trail = head, p = trail.next;
p != null;
trail = p, p = p.next) {
if (o.equals(p.item)) {
unlink(p, trail); //元素删除
return true;
}
}
return false;
} finally {
fullyUnlock(); //是否双锁
}
}
void fullyLock() {
putLock.lock();
takeLock.lock();
}
void unlink(Node<E> p, Node<E> trail) {
p.item = null;
trail.next = p.next;
if (last == p)
last = trail;
//删除元素后,队列元素个数-1,唤醒入列等待线程
if (count.getAndDecrement() == capacity)
notFull.signal();
}
void fullyUnlock() {
takeLock.unlock();
putLock.unlock();
}
LinkedBlockingQueue 在执行删除操作,需要对takeLock 跟 putLock同时加锁,其目标确保在删除期间,其他线程无法操作队列,进而保证删除操作的线程安全。
总结
LinkedBlockingQueue 使用2把锁确保线程安全,入列时使用putLock,出列时使用takeLock,这种锁分离操作机制,在一定层面上提高队列的吞吐量,在高并发的情况下生产者(入列线程)和消费者(出列线程)可以并行地操作队列中的数据,以此来提高整个队列的并发性能。
想获取更多技术干货,请前往叩丁狼官网:http://www.wolfcode.cn/all_article.html