大飞老师带你看线程(并发容器—LinkedBlockingQueue)

本文作者:王一飞,叩丁狼高级讲师。原创文章,转载请注明出处。

概念

LinkedBlockingQueue按照api解释:一个基于链表而实现的有界阻塞队列。遵循先进先出原则,由队头入列,再从队尾出列。具体操作上跟ArrayBlockingQueue类似,区别在于底层维护数据上,LinkedBlockingQueue底层是一个链接,而ArrayBlockingQueue是一个数组。


外部结构

内部结构

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    private final AtomicInteger count = new AtomicInteger();  //队列元素个数
    private final int capacity;  //队列容器
    transient Node<E> head;  //队头
    private transient Node<E> last; //队尾

    //出列入列过程中维护现场安全的各类锁
    private final ReentrantLock takeLock = new ReentrantLock();
    private final Condition notEmpty = takeLock.newCondition();
    private final ReentrantLock putLock = new ReentrantLock();
    private final Condition notFull = putLock.newCondition();

    //队列数据节点
    static class Node<E> {
        E item;
        Node<E> next;
        Node(E x) { item = x; }
    }
}
基本操作
public class App {
    public static void main(String[] args) throws InterruptedException {
        LinkedBlockingQueue<String> queue = new LinkedBlockingQueue(5);
        //入列
        queue.add("a");  //队列满后抛异常
        queue.put("b");//队列满后阻塞
        queue.offer("c"); //入列失败返回false
        System.out.println(queue);
        queue.put("a");
        queue.put("b");
        queue.put("c");
        queue.put("d");
        queue.put("e");
        //出列
        queue.remove("a"); //删除指定元素
        queue.poll();  //出列,如果队列为空返回null
        queue.take();  //队列为空,阻塞等待

        System.out.println(queue);
    }
}

一般推荐使用put入列, take出列

源码解析

构造-LinkedBlockingQueue提供了3个构造器,无参, 带有容量,带集合数据

    //无参数时,默认容量为int的最大值
    public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }
    //带容量参数【推荐】
    public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        //初始化队头,队尾
        last = head = new Node<E>(null);
    }
    //带数据集合
    public LinkedBlockingQueue(Collection<? extends E> c) {
        this(Integer.MAX_VALUE);   //容量为int最大值
        final ReentrantLock putLock = this.putLock;
        putLock.lock(); //谨慎起见也加锁,需要将传入集合逐一入列
        try {
            int n = 0;
            for (E e : c) {
                if (e == null)
                    throw new NullPointerException();
                if (n == capacity)
                    throw new IllegalStateException("Queue full");
                enqueue(new Node<E>(e));
                ++n;
            }
            count.set(n);
        } finally {
            putLock.unlock();
        }
    }

LinkedBlockingQueue 3个构造器,实际使用中更推荐使用指定容量的队列。

在继续看源码前,先了解一个原子操作类:AtomicInteger

        //int 类型的原子操作,不指定初始为0
        //底层维护了一个volatile 修饰变量 value = 0
        AtomicInteger atomicInteger = new AtomicInteger();
        //获取:value = 0
        System.out.println(atomicInteger.get());  //0
        //返回value值0, 然后value 值加一【这里也是原子操作】
        System.out.println(atomicInteger.getAndIncrement()); //0
        //返回value值1, 然后value 值加一【这里也是原子操作】
        System.out.println(atomicInteger.getAndIncrement()); //1
        //获取:value=2
        System.out.println(atomicInteger.get()); //2
        //int 类型的原子操作,不指定初始为0
        //底层维护了一个volatile 修饰变量 value = 0
        AtomicInteger atomicInteger = new AtomicInteger();
        //获取:value = 0
        System.out.println(atomicInteger.get());  //0
        //返回value值0, 然后value 值减一【这里也是原子操作】
        System.out.println(atomicInteger.getAndDecrement()); //0
        //返回value值1, 然后value 值减一【这里也是原子操作】
        System.out.println(atomicInteger.getAndDecrement()); //-1
        //返回value值1
        System.out.println(atomicInteger.get()); //-2

getAndIncrement : 返回未操作前value 的值, 然后加1
getAndDecrement : 返回未操作前value 的值, 然后减1

入列-put:将入列数据添加到队尾,如果队列满了,阻塞等待。

public void put(E e) throws InterruptedException {
        //入列元素不允许为null
        if (e == null) throw new NullPointerException();
        //队列临时容量缓存,作为执行唤醒/阻塞线程操作标记
        int c = -1;
        Node<E> node = new Node<E>(e);
        //入列锁
        final ReentrantLock putLock = this.putLock;
        //队列元素个数
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly(); //入列前加锁,可中断锁
        try {
            //自旋排除硬件加锁延时问题
            //如果队列已满,线程阻塞等待
            while (count.get() == capacity) {
                notFull.await();
            }
            //数据入列
            enqueue(node);
             //原子操作,入列后再获取队列元素个数,并+1,确保当前操作队列元素个数最新
            c = count.getAndIncrement();
            //c + 1 < capacity 表示队列未满,仍可添加,唤醒未持锁而等待的入列线程
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock(); //释放锁
        }
        //c == 0 说明队列为空,唤醒入列线程入列 
        if (c == 0)
            signalNotEmpty();
    }

从上面源码上看put方法其实做5件事:
1>判断元素是否null, 为null 抛异常
2>判断是否满列,满列则等待,此处需要留意while这个操作。
理想请求下,if即可,但是有种情况,如果jvm执行指令传到cpu到程序时间片执行存在一点的时间延时,while 重复执行,可以减少延时影响。
3>数据入列
4>入列后需要唤醒未持锁而等待的入列线程
5>c==0的判断, c的值是入列前容量值,如果为0说明入列前,队列为空,可以存在出列等待线程,所以在c==0的时候,且已经入列成功,可以唤醒出列等待线程,让其顺利出列。

出列-take : 出列,从队头弹出元素, 如果队列个数为0, 阻塞等待。

public E take() throws InterruptedException {
        E x;
         //队列临时容量缓存,作为执行唤醒/阻塞线程操作标记
        int c = -1; 
        //队列元素个数
        final AtomicInteger count = this.count;
         //出列锁
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();//出列前加锁,可中断锁
        try {
            //自旋排除硬件加锁延时问题
            //如果队列已空,线程阻塞等待
            while (count.get() == 0) {
                notEmpty.await();
            }
            x = dequeue();  //数据出列
            //原子操作,出列后再获取队列元素个数,并-1,确保当前操作队列元素个数最新
            c = count.getAndDecrement();
            //c > 1 表示队列未空,仍可出列,唤醒未持锁而等待的出列线程
            if (c > 1)  
                notEmpty.signal();
        } finally {
            takeLock.unlock();//释放锁
        }
        //c == capacity 说明队列已满,唤醒出列线程出列 
        if (c == capacity)
            signalNotFull();
        return x;
    }

上面源码看出,take操作跟put原理差不多,执行的是反向操作而已。需要注意的是take方法唤醒线程条件c 变量值判断条件。
1> 出列成功之后, c > 1 表示出列前队列至少有2个元素,所以出列成功后,唤醒未持锁而等待的出列线程
2>c == capacity 满足这个条件, 表示出列前,队列是满的,可能存在入列等待线程,出列成功之后,解除等待,唤醒入列等待线程。

删除-remove : 根据指定元素删除队列中的元素,如果有删除,如果没有返回false

public boolean remove(Object o) {
        if (o == null) return false;   //参数为null 直接返回
        fullyLock();  //为保证入列出列线程安全,加双锁
        try {
            //循环遍历列表,删除指定元素
            for (Node<E> trail = head, p = trail.next;
                 p != null;
                 trail = p, p = p.next) {
                if (o.equals(p.item)) {
                    unlink(p, trail);  //元素删除
                    return true;
                }
            }
            return false;
        } finally {
            fullyUnlock();  //是否双锁
        }
    }
    void fullyLock() {
        putLock.lock();
        takeLock.lock();
    }
    void unlink(Node<E> p, Node<E> trail) {
        p.item = null;
        trail.next = p.next;
        if (last == p)
            last = trail;
        //删除元素后,队列元素个数-1,唤醒入列等待线程
        if (count.getAndDecrement() == capacity)
            notFull.signal();
    }
    void fullyUnlock() {
        takeLock.unlock();
        putLock.unlock();
    }

LinkedBlockingQueue 在执行删除操作,需要对takeLock 跟 putLock同时加锁,其目标确保在删除期间,其他线程无法操作队列,进而保证删除操作的线程安全。

总结

LinkedBlockingQueue 使用2把锁确保线程安全,入列时使用putLock,出列时使用takeLock,这种锁分离操作机制,在一定层面上提高队列的吞吐量,在高并发的情况下生产者(入列线程)和消费者(出列线程)可以并行地操作队列中的数据,以此来提高整个队列的并发性能。

想获取更多技术干货,请前往叩丁狼官网:http://www.wolfcode.cn/all_article.html

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,544评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,430评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,764评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,193评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,216评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,182评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,063评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,917评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,329评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,543评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,722评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,425评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,019评论 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,671评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,825评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,729评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,614评论 2 353