简介
Queue
,翻译成队列,是一种先进先出(FIFO, First In First Out)的数据结构。最先放进去的,取的时候也就最先取出来。最形象的比喻就是我们常见的排队就是一个队列。排队时,新来的人进入队尾,先到的人首先接收服务。
Queue
大多数是单向队列,即只能从一端取数据,另一端放入数据。就像把羽毛球放入球筒,从一端放入,从另一端取出。
Queue体系
public interface Queue<E> extends Collection<E> {
boolean add(E e);
boolean offer(E e);
E remove();
E poll();
E element();
E peek();
}
-
boolean add(E e)
:向queue中添加element。成功返回true。不会返回false。当添加失败(比如queue有大小限制),则抛出异常IllegalStateException
-
boolean offer(E e)
:跟add类似。区别在于add插入失败会抛出异常,offer会返回false。 -
E remove()
: 返回第一个element,并从queue中删除。queue为空则抛出异常NoSuchElementException
-
E pool()
: 返回第一个element,并从queue中删除。queue为空则返回null -
E element()
: 返回第一个element,但是不从queue中删除。queue为空则抛出异常NoSuchElementException
-
E peak()
:返回第一个element,但是不从queue中删除。queue为空则抛出异常
上面的6个方法分别表示了3种操作,每一种操作都有两种类型,抛出异常的类型和有特定返回值的类型。下面用表格来描述上述方法的异同:
方法 | 作用 | 返回值 | 成功 | 失败 |
---|---|---|---|---|
add(E e) | 插入元素 | boolean | true | IllegalStateException |
offer(E e) | 同add() | boolean | true | false |
remove(E e) | 返回第一个元素,并删除 | E | E | NoSuchElementException |
pool(E e) | 同remove | boolean | E | null |
element() | 返回第一个元素,不删除 | E | E | NoSuchElementException |
peak() | 同element | E | E | null |
Queue
接口常见的扩展和实现有:
-
AbstractQueue
: 实现部分方法的抽象类。做为大多数具体实现类的基类。 -
BlockingQueue
: 阻塞队列。在Queue
的基础上增加了阻塞接口。 -
Deque
: 双向队列,double ended queue
的缩写。队列两端都可以操作队列。
AbstractQueue
AbstractQueue
类似于很多JDK源码中的Abstract*
类,实现了部分通用方法,做为具体实现类的基类。这里主要讲一下PriorityQueue
和ConcurrentLinkedQueue
。
PriorityQueue
基于堆实现的优先队列。获取数据时是有序的。默认是升序。
主要特点:
- 无界队列。队列是用数组实现,需要指定大小,数组大小可以动态增加,容量无限。
- 要实现插入的元素有序,有两种方法:
- 插入元素实现了
Comparable
接口 - 在构造函数中传入
Comparator
实现,如下
- 插入元素实现了
public class PriorityQueue<E> extends AbstractQueue<E>{
public PriorityQueue(int initialCapacity, Comparator<? super E> comparator);
}
- 非线程安全。如需线程安全的实现,请使用
PriorityBlockingQueue
- 插入的元素不能为null
- 使用
for-each
或iterator
来遍历优先队列,得到的结果并不保证有序。只有通过不断调用poll()/remove()
方法得到的结果才是有序的。如果需要按顺序遍历,请考虑使用 Arrays.sort(pq.toArray())。如下:
PriorityQueue<Integer> test = new PriorityQueue<>();
test.add(10);
test.add(4);
test.add(7);
test.add(2);
test.add(9);
//可保证有序输出,输出 2 4 7 9 10
while (!test.isEmpty()) {
System.out.println(test.poll());
}
//不保证有序输出,输出 2 4 7 10 9
for(Integer e : test){
System.out.println(e);
}
//不保证有序输出,输出 2 4 7 10 9
Iterator it = test.iterator();
while (it.hasNext()) {
System.out.println(it.next());
}
ConcurrentLinkedQueue
ConcurrentLinkedQueue
是一个无锁线程安全队列。
常见的线程安全实现都是通过加锁(在“线程安全的实现”这一节就能看到),而加锁的成本是很高的。如果能找到一种方法,既不用加锁,又能保证线程安全,很大可能能极大提升系统性能。ConcurrentLinkedQueue
就是使用了这种无锁方法的一种队列。
其实现思想借鉴了很通用,也很重要的CAS操作。CAS简介
详情请见无锁队列 - ConcurrentLinkedQueue
BlockingQueue
BlockingQueue
名为阻塞队列。
何为阻塞?以从队列中取数据为例,当队列为空时,Queue
提供方法的表现为:
-
remove()
: 抛出异常NoSuchElementException -
poll()
: 返回null
这种情况下,如果想在队列不空时获取数据,只能通过循环不断调用remove()
或poll()
。那么,如果能在队列为空的时候,方法不返回,而是等待数据,直到队列中有了数据,才继续进行,就方便很多了。这就是阻塞方法。
BlockingQueue
在Queue
的基础上主要扩展了以下几个阻塞方法:
public interface BlockingQueue<E> extends Queue<E> {
void put(E e) throws InterruptedException;
E take() throws InterruptedException;
boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException;
E poll(long timeout, TimeUnit unit)
throws InterruptedException;
}
-
void put(E e)
: 作用同add(E e)
,当队列满时阻塞,直到队列不满。 -
boolean take()
: 作用同remove()
,当队列为空时阻塞,直到队列不空。 -
boolean offer(E e, long timeout, TimeUnit unit)
: 作用同offer(E e)
,只是增加了超时,允许等一段时间。如果过了超时时间还不能插入成功,则返回false。 -
E poll(long timeout, TimeUnit unit)
: 作用同poll()
,也是增加了超时。如果过了超时时间还不能获取数据,则返回null。
异常行为 | 插入数据 | 取数据 |
---|---|---|
抛出异常 | add(E e) | remove() |
返回null | offer(E e) | poll() |
阻塞 | put(E e)/offer(E e, long timeout) | take()/poll(long timeout) |
BlockingQueue的具体实现
BlockingQueue
只是个接口,其常用的具体实现有哪些呢?主要有以下这些:
-
ArrayBlockingQueue
: 基于数组的队列,有界队列,必须指定大小。 -
LinkedBlockingQueue
: 基于链表的队列,无界队列,大小可指定,可不指定。 -
PriorityBlockingQueue
: 优先队列,或者说有序队列。队列中的元素按照指定规则排好序。 -
SynchronousQueue
: 没有任何容量(capacity)的队列,是一个比较特殊的队列。 -
DelayQueue
: 延迟队列。放入的元素必须过了超时时间才能取出。放入的元素必须实现Delayed
接口
ArrayBlockingQueue
主要特点:
- 有界队列
- 基于数组存储,数组长度固定,需要在构造函数中指定
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
final Object[] items; //基于数组
final ReentrantLock lock; //线程同步锁
private final Condition notEmpty; //条件变量,用于取数据同时队列为空时阻塞线程
private final Condition notFull; //条件变量,用户插入数据同时队列满时阻塞线程
}
LinkedBlockingQueue
主要特点:
- 可以是无界队列,也可以是有界队列。区别在于是否设定队列大小。
- 基于链表存储。
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
private transient Node<E> head; //队列头
private final ReentrantLock takeLock = new ReentrantLock(); //取数据线程同步锁
private final Condition notEmpty = takeLock.newCondition(); //条件变量,用于取数据同时队列为空时阻塞线程
private final ReentrantLock putLock = new ReentrantLock(); //插入数据线程同步锁
private final Condition notFull = putLock.newCondition(); //条件变量,用户插入数据同时队列满时阻塞线程
}
注:
put(E e)
方法只有在队列满时才组设,因此,如果是无界队列,put(E e)
永远不会阻塞。
PriorityBlockingQueue
优先队列PriorityQueue
的线程安全版本,同时提供阻塞方法。
主要特点同PriorityQueue
。
public class PriorityBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
private transient Object[] queue;
}
SynchronousQueue
比较特殊的队列,没有任何存储空间。
举个简单的例子:
A是个快递员,要送快递给用户B,如果使用ArrayBlockingQueue
或者LinkedBlockingQueue
,会是这样:
- A把快递放到快递柜的箱子里(假设快递柜有20个箱子)
- 如果有空箱子,可以直接把快递放到空箱子中。
- 如果所有的箱子都满了,那么A等着,直到B取了任意快递,空出了箱子,A再把快递放到空箱子中。
- 如果所有的箱子都空了,取快递的人B会一直等待,直到有快递投递到箱子中。
那么如果使用SynchronousQueue
,情况就不同了
- 首先没有能临时存放快递的柜子和箱子
- A要送快递,会一直拿着快递等,直到B来取。
- B要取快递,也会一直等,直到A来送。
SynchronousQueue
就是类似这种“手到手”的交付方式,不经过任何媒介缓存。
主要特点:
- 没有任何数据结构可保存数据,不能调用peek()方法来看队列中是否有数据元素
- 不能遍历队列,应为根本没有存储任何数据
- 调用
put()
方法会等待,直到有其他线程调用了take()
方法
DelayQueue
DelayQueue
的继承结构如下:
public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> {
private final PriorityQueue<E> q = new PriorityQueue<E>(); //存储数据
}
由类的声明可知,插入到DelayQueue
中的元素必须实现Delayed
接口。Delayed
接口比较简单,只有一个getDelay(TimeUnit unit)
方法。
public interface Delayed extends Comparable<Delayed> {
long getDelay(TimeUnit unit); //剩余时间
}
同时,DelayQueue
存储数据既不像ArrayBlockingQueue
使用数组,也不像LinkedBlockingQueue
使用链表,而是使用现成的PriorityQueue
。因此DelayQueue
取数据的规则跟PriorityQueue
类似。
何谓延迟?
延迟主要体现在取数据的时候。通过查看poll()
的源码可知:
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E first = q.peek();
if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
return null;
else
return q.poll();
} finally {
lock.unlock();
}
}
在取数的时候,getDelay()
必须小于等于0才能把数取出来。通过实现getDelay()
方法,就能实现过多长时间以后才能取出数据这种延迟效果。
主要特点:
- 无界队列,因此
put(E e)
不会阻塞 - 调用取元素的方法:
remove()
、poll()
、take()
时,只有当元素超时以后才能取到。
BlockingQueue的应用
实现生产者-消费者
生产者-消费者模式在工程之中应用广泛。BlockingQueue
可极大简化实现生产者-消费者的难度。
伪代码:
//生产者
public class Producer{
private BlockingQueue queue;
Producer(BlockingQueue queue){this.queue = queue}
public void produce(E e){
//生产者调用put()方法,队列满时阻塞等待。
queue.put(e);
}
}
//消费者
public class Consumer{
private BlockingQueue queue;
Consumer(BlockingQueue queue){this.queue = queue}
public E consume(){
//消费者调用take()方法,队列空时阻塞等待
queue.take(e);
}
}
做为线程池的等待队列
- 线程池
ThreadPoolExecutor
的构造函数接收BlockingQueue
做为等待队列 -
Executors.newSingleThreadPool()/Executors.newFixedThreadPool()
默认使用LinkedBlockingQueue
做为等待队列 -
Executors.newCachedThreadPool()
默认使用SynchronousQueue
做为等待队列。可以做到一有请求,就创建新线程。
求Top K大/小的元素
比如有1亿个随机数字,找出最大的10个数。这种类似的求Top K的问题很常见。
由于PriorityQueue/PriorityBlockingQueue
底层结构是堆(大顶堆/小顶堆),而解决Top K问题的最好办法就是使用堆,因此PriorityQueue/PriorityBlockingQueue
是解决该问题的不二选择。
代码摘要:
public class FixSizedPriorityQueue<E extends Comparable> {
private PriorityQueue<E> queue;
private int maxSize; // 堆的最大容量
public FixSizedPriorityQueue(int maxSize) {
if (maxSize <= 0)
throw new IllegalArgumentException();
this.maxSize = maxSize;
this.queue = new PriorityQueue(maxSize, new Comparator<E>() {
public int compare(E o1, E o2) {
// 生成最大堆使用o2-o1,生成最小堆使用o1-o2, 并修改 e.compareTo(peek) 比较规则
return (o2.compareTo(o1));
}
});
}
public void add(E e) {
if (queue.size() < maxSize) { // 未达到最大容量,直接添加
queue.add(e);
} else { // 队列已满
E peek = queue.peek();
if (e.compareTo(peek) < 0) { // 将新元素与当前堆顶元素比较,保留较小的元素
queue.poll();
queue.add(e);
}
}
}
}
延时需求
如:考试时间为120分钟,30分钟后才可交卷。这种情况下,就需要使用到DelayQueue
了。
案例参考
Deque
Deque
发音为'deck',为双向队列。
Queue
默认是单向队列,数据只能从一头放入,从另一头取出,这跟羽毛球的放取原理一样。双向队列就像放取乒乓球,可以从两端放入,也可以从两端取出。
Deque
接口的部分组成
public interface Deque<E> extends Queue<E> {
//新增方法
void addFirst(E e);
void addLast(E e);
boolean offerFirst(E e);
boolean offerLast(E e);
E removeFirst();
E removeLast();
E pollFirst();
E pollLast();
E getFirst();
E getLast();
E peekFirst();
E peekLast();
void push(E e);
E pop();
//Queue接口方法
boolean add(E e);
boolean offer(E e);
E remove();
E poll();
E element();
E peek();
}
从扩展的函数名字就能看出来,Deque
分别针对插入和取出接口提供了对队列头和尾的操作。
同时,从Queue
继承过来方法的与扩展方法有如下对应关系:
add(E e)
==addLast(E e)
offer(E e)
==offerLast(E e)
remove()
==removeFirst()
poll()
==pollFirst()
element()
==getFirst()
peek()
==peekFirst()
同时,Deque
还扩展出了push(E e)
和pop()
方法。其中:
push(E e)
等同于addFirst(E e)
pop()
等同于removeFirst()
因此,当Deque
只通过push(E e)
和pop()
操作队列头时,Deque
就演化成了一个栈Stack
!
[图片上传失败...(image-f152e6-1551348671199)]
扩展接口或实现类
-
ArrayDeque
: 基于数组存储的双向队列 -
LinkedList
: 基于链表存储的双向队列 -
ConcurrentLinkedDeque
: 基于链表的线程安全版本 -
BlockingDeque
: 扩展了Deque
接口的阻塞接口,同时继承了BlockingQueue
接口。 -
LinkedBlockingDeque
: 实现了BlockingDeque
的双向队列。
线程安全的实现
引申
Java线程安全...
BlockingQueue
中的方法都是线程安全的,都使用了ReentrantLock
做为锁。如:
ArrayBlockingQueue
ArrayBlockingQueue
中有公共变量count
来计数元素个数,因此需要一个全局的锁来保护。
public ArrayBlockingQueue(int capacity, boolean fair) {
lock = new ReentrantLock(fair);
}
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
//do something
} finally {
lock.unlock();
}
}
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
//do something
} finally {
lock.unlock();
}
}
LinkedBlockingQueue
LinkedBlockingQueue
中计数元素个数使用的是AtomicInteger
类型,本身是线程安全的。因此没有使用全局锁,而是针对插入和获取分别创建了两个锁。
private final ReentrantLock takeLock = new ReentrantLock();
private final ReentrantLock putLock = new ReentrantLock();
public boolean offer(E e) {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
//do something
} finally {
putLock.unlock();
}
}
public E poll() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
//do something
} finally {
takeLock.unlock();
}
}
引申
上面实现线程安全的方式都是通过锁。而我们知道,锁是一个比较重的操作,在高并发系统中,能少用就少用。因此,在此介绍一个不用锁就能实现线程安全的队列:
无锁队列 - ConcurrentLinkedQueue...
无锁并且保证线程安全的思想是使用CAS...
阻塞算法的实现
引申
Java线程通信...
BlockingQueue
的功能是个标准的生产者-消费者模式。线程间通信使用的是条件变量Condition
。伪代码如下:
ReentrantLock lock = new ReentrantLock(fair);
private final Condition notEmpty = lock.newCondition();
private final Condition notFull = lock.newCondition();
public boolean put(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
//如果队列已经满了,那么等待队列notFull
while(count == container.size){
notFull.await()
}
notEmpty.signal();
//do other things
} finally {
lock.unlock();
}
}
public E take() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
//如果队列为空,那么等待队列notEmpty
while (count == 0){
notEmpty.await();
}
notFull.signal();
} finally {
lock.unlock();
}
}
非阻塞型
ConcurrentLinkedQueue: 无锁线程安全队列