Java - Queue

简介

Queue,翻译成队列,是一种先进先出(FIFO, First In First Out)的数据结构。最先放进去的,取的时候也就最先取出来。最形象的比喻就是我们常见的排队就是一个队列。排队时,新来的人进入队尾,先到的人首先接收服务。

Queue大多数是单向队列,即只能从一端取数据,另一端放入数据。就像把羽毛球放入球筒,从一端放入,从另一端取出。

image.png

Queue体系

public interface Queue<E> extends Collection<E> {
    boolean add(E e);
    boolean offer(E e);
    E remove();
    E poll();
    E element();
    E peek();
}
  • boolean add(E e):向queue中添加element。成功返回true。不会返回false。当添加失败(比如queue有大小限制),则抛出异常IllegalStateException
  • boolean offer(E e):跟add类似。区别在于add插入失败会抛出异常,offer会返回false。
  • E remove(): 返回第一个element,并从queue中删除。queue为空则抛出异常NoSuchElementException
  • E pool(): 返回第一个element,并从queue中删除。queue为空则返回null
  • E element(): 返回第一个element,但是不从queue中删除。queue为空则抛出异常NoSuchElementException
  • E peak():返回第一个element,但是不从queue中删除。queue为空则抛出异常

上面的6个方法分别表示了3种操作,每一种操作都有两种类型,抛出异常的类型有特定返回值的类型。下面用表格来描述上述方法的异同:

方法 作用 返回值 成功 失败
add(E e) 插入元素 boolean true IllegalStateException
offer(E e) 同add() boolean true false
remove(E e) 返回第一个元素,并删除 E E NoSuchElementException
pool(E e) 同remove boolean E null
element() 返回第一个元素,不删除 E E NoSuchElementException
peak() 同element E E null

Queue接口常见的扩展和实现有:

image.png

  • AbstractQueue: 实现部分方法的抽象类。做为大多数具体实现类的基类。
  • BlockingQueue: 阻塞队列。在Queue的基础上增加了阻塞接口。
  • Deque: 双向队列,double ended queue的缩写。队列两端都可以操作队列。

AbstractQueue

AbstractQueue类似于很多JDK源码中的Abstract*类,实现了部分通用方法,做为具体实现类的基类。这里主要讲一下PriorityQueueConcurrentLinkedQueue

PriorityQueue

基于堆实现的优先队列。获取数据时是有序的。默认是升序。
主要特点:

  • 无界队列。队列是用数组实现,需要指定大小,数组大小可以动态增加,容量无限。
  • 要实现插入的元素有序,有两种方法:
    • 插入元素实现了Comparable接口
    • 在构造函数中传入Comparator实现,如下
public class PriorityQueue<E> extends AbstractQueue<E>{
    public PriorityQueue(int initialCapacity, Comparator<? super E> comparator);
}
  • 非线程安全。如需线程安全的实现,请使用PriorityBlockingQueue
  • 插入的元素不能为null
  • 使用for-eachiterator来遍历优先队列,得到的结果并不保证有序。只有通过不断调用poll()/remove()方法得到的结果才是有序的。如果需要按顺序遍历,请考虑使用 Arrays.sort(pq.toArray())。如下:
PriorityQueue<Integer> test = new PriorityQueue<>();
test.add(10);
test.add(4);
test.add(7);
test.add(2);
test.add(9);

//可保证有序输出,输出 2 4 7 9 10
while (!test.isEmpty()) {
    System.out.println(test.poll());
}

//不保证有序输出,输出 2 4 7 10 9
for(Integer e : test){
    System.out.println(e);
}

//不保证有序输出,输出 2 4 7 10 9
Iterator it = test.iterator();
while (it.hasNext()) {
    System.out.println(it.next());
}

ConcurrentLinkedQueue

ConcurrentLinkedQueue是一个无锁线程安全队列。
常见的线程安全实现都是通过加锁(在“线程安全的实现”这一节就能看到),而加锁的成本是很高的。如果能找到一种方法,既不用加锁,又能保证线程安全,很大可能能极大提升系统性能。ConcurrentLinkedQueue就是使用了这种无锁方法的一种队列。
其实现思想借鉴了很通用,也很重要的CAS操作。CAS简介
详情请见无锁队列 - ConcurrentLinkedQueue

BlockingQueue

BlockingQueue名为阻塞队列
何为阻塞?以从队列中取数据为例,当队列为空时,Queue提供方法的表现为:

  • remove(): 抛出异常NoSuchElementException
  • poll(): 返回null
    这种情况下,如果想在队列不空时获取数据,只能通过循环不断调用remove()poll()。那么,如果能在队列为空的时候,方法不返回,而是等待数据,直到队列中有了数据,才继续进行,就方便很多了。这就是阻塞方法。

BlockingQueueQueue的基础上主要扩展了以下几个阻塞方法:

public interface BlockingQueue<E> extends Queue<E> {
    void put(E e) throws InterruptedException;
    E take() throws InterruptedException;
    boolean offer(E e, long timeout, TimeUnit unit)
       throws InterruptedException;
    E poll(long timeout, TimeUnit unit)
        throws InterruptedException;
}
  • void put(E e): 作用同add(E e),当队列满时阻塞,直到队列不满。
  • boolean take(): 作用同remove(),当队列为空时阻塞,直到队列不空。
  • boolean offer(E e, long timeout, TimeUnit unit): 作用同offer(E e),只是增加了超时,允许等一段时间。如果过了超时时间还不能插入成功,则返回false。
  • E poll(long timeout, TimeUnit unit): 作用同poll(),也是增加了超时。如果过了超时时间还不能获取数据,则返回null。
异常行为 插入数据 取数据
抛出异常 add(E e) remove()
返回null offer(E e) poll()
阻塞 put(E e)/offer(E e, long timeout) take()/poll(long timeout)

BlockingQueue的具体实现

BlockingQueue只是个接口,其常用的具体实现有哪些呢?主要有以下这些:

image.png

  • ArrayBlockingQueue: 基于数组的队列,有界队列,必须指定大小。
  • LinkedBlockingQueue: 基于链表的队列,无界队列,大小可指定,可不指定。
  • PriorityBlockingQueue: 优先队列,或者说有序队列。队列中的元素按照指定规则排好序。
  • SynchronousQueue: 没有任何容量(capacity)的队列,是一个比较特殊的队列。
  • DelayQueue: 延迟队列。放入的元素必须过了超时时间才能取出。放入的元素必须实现Delayed接口

ArrayBlockingQueue

主要特点:

  • 有界队列
  • 基于数组存储,数组长度固定,需要在构造函数中指定
public class ArrayBlockingQueue<E> extends AbstractQueue<E> 
    implements BlockingQueue<E>, java.io.Serializable {
    final Object[] items;  //基于数组
    final ReentrantLock lock;  //线程同步锁
    private final Condition notEmpty;  //条件变量,用于取数据同时队列为空时阻塞线程
    private final Condition notFull;  //条件变量,用户插入数据同时队列满时阻塞线程
}

LinkedBlockingQueue

主要特点:

  • 可以是无界队列,也可以是有界队列。区别在于是否设定队列大小。
  • 基于链表存储。
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
    implements BlockingQueue<E>, java.io.Serializable {
    private transient Node<E> head;  //队列头
    private final ReentrantLock takeLock = new ReentrantLock();  //取数据线程同步锁
    private final Condition notEmpty = takeLock.newCondition(); //条件变量,用于取数据同时队列为空时阻塞线程
    private final ReentrantLock putLock = new ReentrantLock(); //插入数据线程同步锁
    private final Condition notFull = putLock.newCondition(); //条件变量,用户插入数据同时队列满时阻塞线程
}

注:put(E e)方法只有在队列满时才组设,因此,如果是无界队列,put(E e)永远不会阻塞。

PriorityBlockingQueue

优先队列PriorityQueue的线程安全版本,同时提供阻塞方法。
主要特点同PriorityQueue

public class PriorityBlockingQueue<E> extends AbstractQueue<E>
    implements BlockingQueue<E>, java.io.Serializable {
    private transient Object[] queue;
    
}

SynchronousQueue

比较特殊的队列,没有任何存储空间。
举个简单的例子:
A是个快递员,要送快递给用户B,如果使用ArrayBlockingQueue或者LinkedBlockingQueue,会是这样:

  1. A把快递放到快递柜的箱子里(假设快递柜有20个箱子)
  2. 如果有空箱子,可以直接把快递放到空箱子中。
  3. 如果所有的箱子都满了,那么A等着,直到B取了任意快递,空出了箱子,A再把快递放到空箱子中。
  4. 如果所有的箱子都空了,取快递的人B会一直等待,直到有快递投递到箱子中。

那么如果使用SynchronousQueue,情况就不同了

  1. 首先没有能临时存放快递的柜子和箱子
  2. A要送快递,会一直拿着快递等,直到B来取。
  3. B要取快递,也会一直等,直到A来送。
    SynchronousQueue就是类似这种“手到手”的交付方式,不经过任何媒介缓存。

主要特点:

  • 没有任何数据结构可保存数据,不能调用peek()方法来看队列中是否有数据元素
  • 不能遍历队列,应为根本没有存储任何数据
  • 调用put()方法会等待,直到有其他线程调用了take()方法

DelayQueue

DelayQueue的继承结构如下:

image.png

public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> {
    private final PriorityQueue<E> q = new PriorityQueue<E>();  //存储数据
}

由类的声明可知,插入到DelayQueue中的元素必须实现Delayed接口。Delayed接口比较简单,只有一个getDelay(TimeUnit unit)方法。

public interface Delayed extends Comparable<Delayed> {
    long getDelay(TimeUnit unit);  //剩余时间
}

同时,DelayQueue存储数据既不像ArrayBlockingQueue使用数组,也不像LinkedBlockingQueue使用链表,而是使用现成的PriorityQueue。因此DelayQueue取数据的规则跟PriorityQueue类似。

何谓延迟?
延迟主要体现在取数据的时候。通过查看poll()的源码可知:

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        E first = q.peek();
        if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
            return null;
        else
            return q.poll();
    } finally {
        lock.unlock();
    }
}

在取数的时候,getDelay()必须小于等于0才能把数取出来。通过实现getDelay()方法,就能实现过多长时间以后才能取出数据这种延迟效果。

主要特点:

  • 无界队列,因此put(E e)不会阻塞
  • 调用取元素的方法:remove()poll()take()时,只有当元素超时以后才能取到。

BlockingQueue的应用

实现生产者-消费者
生产者-消费者模式在工程之中应用广泛。BlockingQueue可极大简化实现生产者-消费者的难度。
伪代码:

//生产者
public class Producer{
    private BlockingQueue queue;
    Producer(BlockingQueue queue){this.queue = queue}
    public void produce(E e){
        //生产者调用put()方法,队列满时阻塞等待。
        queue.put(e);
    }
}

//消费者
public class Consumer{
    private BlockingQueue queue;
    Consumer(BlockingQueue queue){this.queue = queue}
    public E consume(){
        //消费者调用take()方法,队列空时阻塞等待
        queue.take(e);
    }
}

做为线程池的等待队列

  • 线程池ThreadPoolExecutor的构造函数接收BlockingQueue做为等待队列
  • Executors.newSingleThreadPool()/Executors.newFixedThreadPool()默认使用LinkedBlockingQueue做为等待队列
  • Executors.newCachedThreadPool()默认使用SynchronousQueue做为等待队列。可以做到一有请求,就创建新线程。

求Top K大/小的元素
比如有1亿个随机数字,找出最大的10个数。这种类似的求Top K的问题很常见。
由于PriorityQueue/PriorityBlockingQueue底层结构是堆(大顶堆/小顶堆),而解决Top K问题的最好办法就是使用堆,因此PriorityQueue/PriorityBlockingQueue是解决该问题的不二选择。
代码摘要:

public class FixSizedPriorityQueue<E extends Comparable> {
    private PriorityQueue<E> queue;
    private int maxSize; // 堆的最大容量

    public FixSizedPriorityQueue(int maxSize) {
        if (maxSize <= 0)
            throw new IllegalArgumentException();
        this.maxSize = maxSize;
        this.queue = new PriorityQueue(maxSize, new Comparator<E>() {
            public int compare(E o1, E o2) {
                // 生成最大堆使用o2-o1,生成最小堆使用o1-o2, 并修改 e.compareTo(peek) 比较规则
                return (o2.compareTo(o1));
            }
        });
    }

    public void add(E e) {
        if (queue.size() < maxSize) { // 未达到最大容量,直接添加
            queue.add(e);
        } else { // 队列已满
            E peek = queue.peek();
            if (e.compareTo(peek) < 0) { // 将新元素与当前堆顶元素比较,保留较小的元素
                queue.poll();
                queue.add(e);
            }
        }
    }
}

延时需求
如:考试时间为120分钟,30分钟后才可交卷。这种情况下,就需要使用到DelayQueue了。
案例参考

Deque

Deque发音为'deck',为双向队列。
Queue默认是单向队列,数据只能从一头放入,从另一头取出,这跟羽毛球的放取原理一样。双向队列就像放取乒乓球,可以从两端放入,也可以从两端取出。

image.png

Deque接口的部分组成

public interface Deque<E> extends Queue<E> {
    //新增方法
    void addFirst(E e);
    void addLast(E e);
    boolean offerFirst(E e);
    boolean offerLast(E e);
    E removeFirst();
    E removeLast();
    E pollFirst();
    E pollLast();
    E getFirst();
    E getLast();
    E peekFirst();
    E peekLast();
    void push(E e);
    E pop();

    //Queue接口方法
    boolean add(E e);
    boolean offer(E e);
    E remove();
    E poll();
    E element();
    E peek();
}

从扩展的函数名字就能看出来,Deque分别针对插入和取出接口提供了对队列头和尾的操作。
同时,从Queue继承过来方法的与扩展方法有如下对应关系:

add(E e) == addLast(E e)
offer(E e) == offerLast(E e)
remove() == removeFirst()
poll() == pollFirst()
element() == getFirst()
peek() == peekFirst()

同时,Deque还扩展出了push(E e)pop()方法。其中:

push(E e)等同于addFirst(E e)
pop()等同于removeFirst()

因此,当Deque只通过push(E e)pop()操作队列头时,Deque就演化成了一个栈Stack

[图片上传失败...(image-f152e6-1551348671199)]

扩展接口或实现类

  • ArrayDeque: 基于数组存储的双向队列
  • LinkedList: 基于链表存储的双向队列
  • ConcurrentLinkedDeque: 基于链表的线程安全版本
  • BlockingDeque: 扩展了Deque接口的阻塞接口,同时继承了BlockingQueue接口。
  • LinkedBlockingDeque: 实现了BlockingDeque的双向队列。

线程安全的实现

引申
Java线程安全...

BlockingQueue中的方法都是线程安全的,都使用了ReentrantLock做为锁。如:
ArrayBlockingQueue
ArrayBlockingQueue中有公共变量count来计数元素个数,因此需要一个全局的锁来保护。

public ArrayBlockingQueue(int capacity, boolean fair) {
    lock = new ReentrantLock(fair);
}
public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        //do something
    } finally {
        lock.unlock();
    }
}

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        //do something
    } finally {
        lock.unlock();
    }
}

LinkedBlockingQueue
LinkedBlockingQueue中计数元素个数使用的是AtomicInteger类型,本身是线程安全的。因此没有使用全局锁,而是针对插入和获取分别创建了两个锁。

private final ReentrantLock takeLock = new ReentrantLock();
private final ReentrantLock putLock = new ReentrantLock();

public boolean offer(E e) {
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        //do something
    } finally {
        putLock.unlock();
    }
}

public E poll() {
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        //do something
    } finally {
        takeLock.unlock();
    }
}

引申
上面实现线程安全的方式都是通过锁。而我们知道,锁是一个比较重的操作,在高并发系统中,能少用就少用。因此,在此介绍一个不用锁就能实现线程安全的队列:
无锁队列 - ConcurrentLinkedQueue...
无锁并且保证线程安全的思想是使用CAS...

阻塞算法的实现

引申
Java线程通信...

BlockingQueue的功能是个标准的生产者-消费者模式。线程间通信使用的是条件变量Condition。伪代码如下:

ReentrantLock lock = new ReentrantLock(fair);
private final Condition notEmpty = lock.newCondition();
private final Condition notFull = lock.newCondition();

public boolean put(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        //如果队列已经满了,那么等待队列notFull
        while(count == container.size){
            notFull.await()
        }
        notEmpty.signal();
        //do other things
    } finally {
        lock.unlock();
    }
}

public E take() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        //如果队列为空,那么等待队列notEmpty
        while (count == 0){
            notEmpty.await();
        }
        notFull.signal();
    } finally {
        lock.unlock();
    }
}

非阻塞型
ConcurrentLinkedQueue: 无锁线程安全队列

参考

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,324评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,303评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,192评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,555评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,569评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,566评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,927评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,583评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,827评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,590评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,669评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,365评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,941评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,928评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,159评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,880评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,399评论 2 342

推荐阅读更多精彩内容