Java并发编程——LinkedBlockingDeque

一、阻塞队列 BlockingQueue

在java.util.concurrent包中,BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据的问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。

1.1、BlockingQueue的基本原理

先来解释一下阻塞队列:


如上图:

  • 1、生产线程1往阻塞队列里面添加新的数据,当阻塞队列满的时候(针对有界队列),生产线程1将会处于阻塞状态,直到消费线程2从队列中取走一个数据;
  • 2、消费线程2从阻塞队列取数据,当阻塞队列空的时候,消费线程2将会处于阻塞状态,直到生产线程把一个数据放进去。

阻塞队列的基本原理就这样,至于队列是用什么数据结构进行存储的,这里并没有规定,所以后面我们可以看到很多阻塞队列的实现。

阻塞队列的常用方法

查阅BlockingQueue总结了以下阻塞队列的方法:

1、boolean add(E e)

  • 在不违反容量限制的情况下,可立即将指定元素插入此队列,成功返回true,当无可用空间时候,返回IllegalStateException异常。

2、boolean offer(E e)

  • 在不违反容量限制的情况下,可立即将指定元素插入此队列,成功返回true,当无可用空间时候,返回false。

3、void put(E e)

  • 直接在队列中插入元素,当无可用空间时候,阻塞等待。

4、boolean offer(E e, long timeout, TimeUnit unit)

  • 将给定元素在给定的时间内设置到队列中,如果设置成功返回true, 否则返回false。

5、E take()

  • 获取并移除队列头部的元素,无元素时候阻塞等待。

6、E poll( long time, timeunit unit)

  • 获取并移除队列头部的元素,无元素时候阻塞等待指定时间。

7、boolean remove()

  • 获取并移除队列头部的元素,无元素时候会抛出NoSuchElementException异常。

8、E element()

  • 不移除的情况下返回列头部的元素,无元素时候会抛出NoSuchElementException异常。

9、E peek()

  • 不移除的情况下返回列头部的元素,队列为空无元素时返回null。

注意:
根据remove(Object o)方法签名可知,这个方法可以移除队列的特定对象,但是这个方法效率并不高。因为需要遍历队列匹配到特定的对象之后,再进行移除。
以上支持阻塞和超时的方法都是能够响应中断的。

1.2、BlockingQueue的实现

BlockingQueue底层也是基于AQS实现的,队列的阻塞使用ReentrantLock的Condition实现的。

下面我们来看看各个实现类的原理。以下分析我都会基于支持阻塞的put和take方法来分析。

二、LinkedBlockingDeque

  • LinkedBlockingDeque是一个由链表结构(双向链表)组成的双向阻塞队列,即可以从队列的两端插入和移除元素,支持FIFO和FILO。

  • 相比于其他阻塞队列,LinkedBlockingDeque多了addFirst、addLast、peekFirst、peekLast等方法。

  • LinkedBlockingDeque是可选容量的,默认容量大小为Integer.MAX_VALUE。

  • LinkedBlockingDequeConcurrentLinkedDeque类似,都是一种双端队列的结构,只不过LinkedBlockingDeque同时也是一种阻塞队列。

注意:LinkedBlockingDeque底层利用ReentrantLock实现同步,并不像ConcurrentLinkedDeque那样采用无锁算法。

如何使用 LinkedBlockingDeque

  • 1、并发场景下,需要作为双端队列使用时,如果只是作为 FIFO 队列使用,则 LinkedBlockingQueue 的性能更高。
  • 2、指定队列的容量,以避免生产速率远高于消费速率时资源耗尽的问题。

使用 LinkedBlockingDeque 的风险

  • 1、未指定容量的情况下,生产速率远高于消费速率时,会导致内存耗尽而 OOM。
  • 2、高并发场景下,性能远低于 LinkedBlockingQueue。
  • 3、由于需要维持前后节点的链接,内存消耗也高于 LinkedBlockingQueue。

2.1、内部结构

LinkedBlockingDeque内部是双链表的结构,结点Node的定义如下:

public class LinkedBlockingDeque<E>
    extends AbstractQueue<E>
    implements BlockingDeque<E>, java.io.Serializable {
    
    /** 双向链表节点 */
    static final class Node<E> {
        /**
         *  节点元素,如果节点已经被移除,则为 null
         */
        E item;

        /**
         * 前驱结点指针.
         */
        Node<E> prev;

        /**
         * 后驱结点指针.
         */
        Node<E> next;

        Node(E x) {
            item = x;
        }
    }
}   

2.2、成员属性

public class LinkedBlockingDeque<E>
    extends AbstractQueue<E>
    implements BlockingDeque<E>, java.io.Serializable {
    
    /**
     * 头结点
     */
    transient Node<E> first;

    /**
     * 尾结点
     */
    transient Node<E> last;

    /** 队列中个数 */
    private transient int count;

    /** 队列长度,可以使用构造注入,如未设定,默认为无界队列 */
    private final int capacity;

    /** 显示锁 */
    final ReentrantLock lock = new ReentrantLock();

    /** 消费队列(队列为空时,无法消费,线程阻塞) */
    private final Condition notEmpty = lock.newCondition();

    /** 生产队列(队列满时,无法入队,线程阻塞) */
    private final Condition notFull = lock.newCondition();
}   

2.3、构造函数

LinkedBlockingDeque一共三种构造器,不指定容量时,默认为Integer.MAX_VALUE:

public class LinkedBlockingDeque<E>
    extends AbstractQueue<E>
    implements BlockingDeque<E>, java.io.Serializable {
    
    /**
     * 默认构造器.
     */ 
    public LinkedBlockingDeque() {
        this(Integer.MAX_VALUE);
    }
    
    /**
     * 指定容量的构造器.
     */ 
    public LinkedBlockingDeque(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
    }

    /**
     * 从已有集合构造队列.
     */
    public LinkedBlockingDeque(Collection<? extends E> c) {
        this(Integer.MAX_VALUE);
        final ReentrantLock lock = this.lock;
        lock.lock(); // Never contended, but necessary for visibility
        try {
            for (E e : c) {
                if (e == null)
                    throw new NullPointerException();
                if (!linkLast(new Node<E>(e)))
                    throw new IllegalStateException("Deque full");
            }
        } finally {
            lock.unlock();
        }
    }   
}   

2.4、队首入队

初始:


队首插入结点node:


  • 1、将目标元素 e 添加到队列头部,如果队列已满,则阻塞等待有可用空间后重试
public class LinkedBlockingDeque<E>
    extends AbstractQueue<E>
    implements BlockingDeque<E>, java.io.Serializable {
    
    /**
     *  将目标元素 e 添加到队列头部,如果队列已满,则阻塞等待有可用空间后重试
     */ 
    public void putFirst(E e) throws InterruptedException {
        // 如果存入的值为null,直接抛出空指针异常
        if (e == null) throw new NullPointerException();
        Node<E> node = new Node<E>(e);
        final ReentrantLock lock = this.lock;
        //加锁
        lock.lock();
        try {
            // 尝试在头部添加元素
            while (!linkFirst(node))
                // 当前线程在非满条件上等待
                notFull.await();
        } finally {
            //释放锁
            lock.unlock();
        }
    }
    
    private boolean linkFirst(Node<E> node) {
        // 队列已满,则直接返回 false
        if (count >= capacity)
            return false;
        // 读取头节点    
        Node<E> f = first;
        // 将旧头结点链接到目标节点之后
        node.next = f;
        // 写入新头节点
        first = node;
        // 1)当前元素为第一个添加到队列中的元素
        if (last == null)
            // 写入尾节点
            last = node;
        else
            // 将旧头节点的前置节点设置为新头结点
            f.prev = node;
        // 递增计数 
        ++count;
        // 唤醒在非空条件上阻塞等待的线程来读取元素
        notEmpty.signal();
        return true;
    }   
}   
  • 2、如果队列已满,则直接返回 false,否则将目标元素 e 添加到队列头部
public class LinkedBlockingDeque<E>
    extends AbstractQueue<E>
    implements BlockingDeque<E>, java.io.Serializable {
    
    /**
     *  如果队列已满,则直接返回 false,否则将目标元素 e 添加到队列头部
     */ 
    public boolean offerFirst(E e) {
        // 如果存入的值为null,直接抛出空指针异常
        if (e == null) throw new NullPointerException();
        Node<E> node = new Node<E>(e);
        final ReentrantLock lock = this.lock;
        //加锁
        lock.lock();
        try {
            // 尝试在头部添加元素
            return linkFirst(node);
        } finally {
            //释放锁
            lock.unlock();
        }
    }
}   
  • 3、在指定的超时时间内尝试将目标元素 e 添加到队列头部,成功则返回 true
public class LinkedBlockingDeque<E>
    extends AbstractQueue<E>
    implements BlockingDeque<E>, java.io.Serializable {
    
    /**
     *  在指定的超时时间内尝试将目标元素 e 添加到队列头部,成功则返回 true
     */ 
    public boolean offerFirst(E e, long timeout, TimeUnit unit)
        throws InterruptedException {
        // 如果存入的值为null,直接抛出空指针异常
        if (e == null) throw new NullPointerException();
        Node<E> node = new Node<E>(e);
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            // 头结点添加失败
            while (!linkFirst(node)) {
                // 已经超时则直接返回
                if (nanos <= 0)
                    return false;
                // 当前线程在非满条件上阻塞等待,唤醒后再次尝试添加 
                nanos = notFull.awaitNanos(nanos);
            }
            return true;
        } finally {
            lock.unlock();
        }
    }
}   

2.4、队尾入队

初始:


队尾插入结点node:


  • 1、将目标元素 e 添加到队列尾部,如果队列已满,则阻塞等待有可用空间后重试
public class LinkedBlockingDeque<E>
    extends AbstractQueue<E>
    implements BlockingDeque<E>, java.io.Serializable {
    
    public void put(E e) throws InterruptedException {
        putLast(e);
    }
    
    /**
     *  将目标元素 e 添加到队列尾部,如果队列已满,则阻塞等待有可用空间后重试    
     */ 
    public void putLast(E e) throws InterruptedException {
        // 如果存入的值为null,直接抛出空指针异常
        if (e == null) throw new NullPointerException();
        Node<E> node = new Node<E>(e);
        final ReentrantLock lock = this.lock;
        //加锁
        lock.lock();
        try {
            // 尝试将节点链接到队列尾部
            while (!linkLast(node))
                // 队列已满,当前线程在非满条件上阻塞等待,被唤醒后再次尝试
                notFull.await();
        } finally {
            //释放锁
            lock.unlock();
        }
    }   
    
    private boolean linkLast(Node<E> node) {
        // 队列已满,则直接返回 false
        if (count >= capacity)
            return false;
        // 读取尾节点    
        Node<E> l = last;
        // 将目标节点链接到尾节点之后
        node.prev = l;
        // 写入尾节点为新增节点
        last = node;
        // 1)当前元素是第一个加入队列的元素
        if (first == null)
            // 写入头结点
            first = node;
        else
            // 将旧尾节点的后置节点更新为新增节点
            l.next = node;
        // 递增总数 
        ++count;
        // 唤醒在非空条件上等待的线程
        notEmpty.signal();
        return true;
    }   
}   
  • 2、如果队列已满,则直接返回 false,否则将目标元素 e 添加到队列尾部
public class LinkedBlockingDeque<E>
    extends AbstractQueue<E>
    implements BlockingDeque<E>, java.io.Serializable {
    
    public boolean offer(E e) {
        return offerLast(e);
    }
    
    /**
     *  如果队列已满,则直接返回 false,否则将目标元素 e 添加到队列尾部
     */ 
    public boolean offerLast(E e) {
        // 如果存入的值为null,直接抛出空指针异常
        if (e == null) throw new NullPointerException();
        Node<E> node = new Node<E>(e);
        final ReentrantLock lock = this.lock;
        //加锁
        lock.lock();
        try {
            // 尝试将节点链接到队列尾部
            return linkLast(node);
        } finally {
            //释放锁
            lock.unlock();
        }
    }   
}   
  • 3、在指定的超时时间内尝试将目标元素 e 添加到队列尾部,成功则返回 true
public class LinkedBlockingDeque<E>
    extends AbstractQueue<E>
    implements BlockingDeque<E>, java.io.Serializable {
    
    public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {
        return offerLast(e, timeout, unit);
    }

    /**
     *  在指定的超时时间内尝试将目标元素 e 添加到队列尾部,成功则返回 true
     */
    public boolean offerLast(E e, long timeout, TimeUnit unit)
        throws InterruptedException {
        // 如果存入的值为null,直接抛出空指针异常
        if (e == null) throw new NullPointerException();
        Node<E> node = new Node<E>(e);
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        //加锁
        lock.lockInterruptibly();
        try {
            // 尝试将目标元素 e 添加到队列尾部
            while (!linkLast(node)) {
                // 已经超时则直接返回 false
                if (nanos <= 0)
                    return false;
                // 当前线程在非满条件上阻塞等待,被唤醒后再次尝试  
                nanos = notFull.awaitNanos(nanos);
            }
            return true;
        } finally {
            //释放锁
            lock.unlock();
        }
    }   
}   

2.5、队首出队

初始:


删除队首结点:


  • 1、移除并返回头部节点,如果队列为空,则阻塞等待有可用元素之后重试
public class LinkedBlockingDeque<E>
    extends AbstractQueue<E>
    implements BlockingDeque<E>, java.io.Serializable {
    
    public E take() throws InterruptedException {
        return takeFirst();
    }
    
    /**
     *  移除并返回头部节点,如果队列为空,则阻塞等待有可用元素之后重试
     */ 
    public E takeFirst() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            E x;
            // 尝试移除并返回头部节点
            while ( (x = unlinkFirst()) == null)
                // 队列为空,则阻塞等待有可用元素之后重试
                notEmpty.await();
            return x;
        } finally {
            lock.unlock();
        }
    }   
}   
  • 2、如果队列为空,则立即返回 null,否则移除并返回头部元素
public class LinkedBlockingDeque<E>
    extends AbstractQueue<E>
    implements BlockingDeque<E>, java.io.Serializable {
    
    public E poll() {
        return pollFirst();
    }
    
    /**
     *  如果队列为空,则立即返回 null,否则移除并返回头部元素
     */ 
    public E pollFirst() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return unlinkFirst();
        } finally {
            lock.unlock();
        }
    }   
    
    /**
     * 从队首删除一个元素, 失败则返回null.
     */ 
    private E unlinkFirst() {    
        // 获取首节点    
        Node<E> f = first;    
        // 首节点为null,则返回null    
        if (f == null)        
            return null;    
        // 获取首节点的后继节点    
        Node<E> n = f.next;    
        // 移除first,将首节点更新为n
        E item = f.item;    
        f.item = null;    
        f.next = f; // help GC    
        first = n;    
        // 移除首节点后,为空队列    
        if (n == null)        
            last = null;    
        else        
            // 将新的首节点的前驱节点设置为null        
            n.prev = null;    
        --count;    
        // 唤醒阻塞在notFull上的线程    
        notFull.signal();    
        return item;
    }
}   
  • 3、在指定的超时时间内尝试移除并返回头部元素,如果已经超时,则返回 null
public class LinkedBlockingDeque<E>
    extends AbstractQueue<E>
    implements BlockingDeque<E>, java.io.Serializable {
    
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        return pollFirst(timeout, unit);
    }
    
    /**
     *  在指定的超时时间内尝试移除并返回头部元素,如果已经超时,则返回 null
     */ 
    public E pollFirst(long timeout, TimeUnit unit)
        throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            E x;
            // 尝试移除并返回头部元素
            while ( (x = unlinkFirst()) == null) {
                // 已经超时则返回 null
                if (nanos <= 0)
                    return null;
                // 当前线程在非空条件上阻塞等待,被唤醒后进行重试  
                nanos = notEmpty.awaitNanos(nanos);
            }
            // 移除成功则直接返回头部元素
            return x;
        } finally {
            lock.unlock();
        }
    }   
}   

2.6、队尾出队

初始:


删除队尾结点:


  • 1、移除并返回尾部节点,如果队列为空,则阻塞等待有可用元素之后重试
public class LinkedBlockingDeque<E>
    extends AbstractQueue<E>
    implements BlockingDeque<E>, java.io.Serializable {
    
    /**
     *  移除并返回尾部节点,如果队列为空,则阻塞等待有可用元素之后重试
     */ 
    public E takeLast() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            E x;
            // 尝试移除并返回尾部节点
            while ( (x = unlinkLast()) == null)
                // 队列为空,则阻塞等待有可用元素之后重试
                notEmpty.await();
            return x;
        } finally {
            lock.unlock();
        }
    }
    
    /**
     * 从队尾删除一个元素, 失败则返回null.
     */     
    private E unlinkLast() {       
        // 获取尾节点    
        Node<E> l = last;    
        // 尾节点为null,则返回null    
        if (l == null)        
            return null;    
        // 获取尾节点的前驱节点    
        Node<E> p = l.prev;    
        // 移除尾节点,将尾节点更新为p    
        E item = l.item;    
        l.item = null;    
        l.prev = l; // help GC    
        last = p;    
        // 移除尾节点后,为空队列    
        if (p == null)        
            first = null;    
        else        
            // 将新的尾节点的后继节点设置为null        
            p.next = null;    
        --count;    
        // 唤醒阻塞在notFull上的线程    
        notFull.signal();    
        return item;
    }   
}   
  • 2、如果队列为空,则立即返回 null,否则移除并返回尾部元素
public class LinkedBlockingDeque<E>
    extends AbstractQueue<E>
    implements BlockingDeque<E>, java.io.Serializable {
    
    /**
     *  如果队列为空,则抛出异常,否则移除并返回尾部元素
     */ 
    public E removeLast() {
        E x = pollLast();
        if (x == null) throw new NoSuchElementException();
        return x;
    }
    
    /**
     *  如果队列为空,则立即返回 null,否则移除并返回尾部元素
     */ 
    public E pollLast() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            //从队尾删除一个元素, 失败则返回null. 
            return unlinkLast();
        } finally {
            lock.unlock();
        }
    }   
}
  • 3、在指定的超时时间内尝试移除并返回尾部元素,如果已经超时,则返回 null
public class LinkedBlockingDeque<E>
    extends AbstractQueue<E>
    implements BlockingDeque<E>, java.io.Serializable {
    
    public E pollLast(long timeout, TimeUnit unit)
        throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            E x;
            // 尝试移除并返回尾部元素
            while ( (x = unlinkLast()) == null) {
                // 已经超时则返回 null
                if (nanos <= 0)
                    return null;
                // 当前线程在非空条件上阻塞等待,被唤醒后进行重试
                nanos = notEmpty.awaitNanos(nanos);
            }
            // 移除成功则直接返回尾部元素
            return x;
        } finally {
            lock.unlock();
        }
    }
}

三、LinkedBlockingQueue与LinkedBlockingDeque对比

LinkedBlockingQueue

  • FIFO;
  • 读写分开两个ReentrantLock;

LinkedBlockingDeque

  • FIFO & FILO;
  • 全局一把ReentrantLock;

参考:
https://www.itzhai.com/articles/graphical-blocking-queue.html

https://www.cnblogs.com/zhuxudong/p/10079511.html

https://segmentfault.com/a/1190000016398508

https://www.cnblogs.com/snake107/p/12035580.html

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,826评论 6 506
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,968评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,234评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,562评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,611评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,482评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,271评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,166评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,608评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,814评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,926评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,644评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,249评论 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,866评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,991评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,063评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,871评论 2 354

推荐阅读更多精彩内容