JUC 阻塞队列

概述

阻塞队列解决的问题:在一个容量有限的仓库里面,实现满了就挂起生产线程,空了就挂起消费线程的兼顾性能和安全的数据结构

接口的方法:

  1. 失败抛出异常: add、remove
  2. 阻塞等待:put、take
  3. 快速反馈:offer、pull、offer(timeout)

常用阻塞队列特性总结:通过加锁实现安全地读写(size、contains函数也是加锁读),通过condition或者AtomicInteger来协调生产和消费

  1. ArrayBlockQueue:1把锁,无法并发写,2个condition,通过await、signal来实现线程调度。FIFO队列的定长阻塞队列
  2. LinkedBlockQueue:2把锁,支持生产、消费并发执行,但是不支持并发的生产,通过AtomicInteger和CAS来控制库存。可以指定大小,默认无界队列。
  3. SynchronousQueue:没有容量的队列,put、take成为一对儿才可以执行,否则阻塞,add和remove如果没有配对成功,直接报错。用于快速响应的业务场景
  4. PriorityBlockingQueue:有优先级的队列,插入元素实现Compare接口,内部维护了一个最小堆,一把锁,基于数组,自动扩容的无界队列

ArrayBlockQueue

Demo

    public static void main(String[] args) throws InterruptedException {
        ArrayBlockingQueue queue=new ArrayBlockingQueue(10);
        for(int i=0;i<20;i++){
            queue.put("obj:"+i); //阻塞
            System.out.println("put "+i);
        }
    }
  1. 基于定长数组的有界队列,初始化的时候必须制定队列的大小,1个锁,2个condition对象
 final Object[] items;
 int takeIndex;
 int putIndex;
 int count;
 final ReentrantLock lock;
 private final Condition notEmpty;
 private final Condition notFull;
  1. 严格的先进先出队列,基于takeIndex、putIndex、length、count等参数实现FIFO
    实现原理:就是一个封装了的一个Lock,2个condition,套路一样:
  • 第一步加锁
  • 第二步while循环判断是否满足生产条件,不满足就阻塞,满足就生产
  • 第三步调用signal()消费线程
  • 第四步释放锁
 public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        - 第一步加锁
        lock.lockInterruptibly();
        try {
            - 第二步while循环判断是否满足生产条件,不满足就阻塞,满足就生产
            while (count == items.length){
                notFull.await();
                }
                
            insert(e);
        } finally {
            - 第四步释放锁
            lock.unlock();
        }
    }

    private void insert(E x) {
        items[putIndex] = x;
        putIndex = inc(putIndex);
        ++count;
        - 第三步调用signal()消费线程
        notEmpty.signal();
    }

LinkedBlockQueue

  1. LinkedBlockQueue内部是基于单向链表的队列,可以设置capacity来实现有界队列,也可以不设置,默认是无界队列。无界队列的时候,put、add、offer就是一样的啊,因为没有限制,所以插入肯定成功.
  2. 内部的实现机制是2把锁,各有一个condition,通过AtomicInteger来指定库存,CAS操作来自增或者自减库存,从而实现线程安全
  3. 生产线程和消费线程可以并行的进行操作,提升性能
  4. 生产线程和消费线程的交互和常规的方式不一样,生产线程只在当前库存为0的时候,才会触发 消费线程的signal(),进行了性能优化,因为当库存不为0的时候,不会有消费线程阻塞。
static class Node<E> {
        E item;
        Node<E> next;
        Node(E x) { item = x; }
    }
private transient Node<E> head;
private transient Node<E> last;
private final int capacity;
private final AtomicInteger count = new AtomicInteger(0);
private final ReentrantLock takeLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition();
private final ReentrantLock putLock = new ReentrantLock();
private final Condition notFull = putLock.newCondition();

private void enqueue(Node<E> node) {
        last = last.next = node;
    }
  1. put的过程
  • 对putLock加锁
  • while循环判断是否库存满了,满了就阻塞,未满就入队,然后CAS的方式自增并返回老的库存值
  • 如果新库存仍然未满,会触发其它的生产线程生产
  • 释放putLock锁
  • 如果原库存为0,会触发其它的消费线程开始消费
public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        int c = -1;
        Node<E> node = new Node(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            while (count.get() == capacity) {
                notFull.await();
            }
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
    }

SynchronousQueue(同步队列)

  1. 没有容量的一个特殊队列
  2. 执行阻塞的方法 put 和 take的时候正常,所有的put方法有一个阻塞的公平队列或者非公平队列,所有的take操作也有一个类似的队列
  3. 执行非阻塞的方法 add()和remove()方法时,必须有对应的take和put方法阻塞着,不然就会报错
  4. peek(返回首位但是不删除元素)永远返回null,因为不存储元素
public static void main(String[] args) throws Exception {
        
        final SynchronousQueue<String> queue = new SynchronousQueue<String>();//初始化不能带长度
        Thread t1 = new Thread(new Runnable() {     
            @Override
            public void run() {
            try {
                String str = queue.take();   //线程1在获取,这是阻塞的,当线程2一添加,线程1就获取,因为SynchronousQueue是没有容量的
                System.out.println("take:"+str);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            }
        });
        t1.start();
                
        Thread t3 = new Thread(new Runnable() { 
            @Override
            public void run() {
            try {
                queue.put("abcde");
            } catch (InterruptedException e) {
            } 
            System.out.println("add:"+"abcde");
            }
        });
        t3.start();
    }

PriorityBlockingQueue

  1. 有优先级但是无界的阻塞队列,类似于List,支持自动扩容,可以指定初始化大小,也可以不指定。实际是一个Arr[]
  2. 内部有一个最小堆,插入和取出的时候,都要构建堆有序
  3. add()的时候不会报错,因为容量无限
  4. 支持元素实现Compare接口,或者PriorityBlockingQueue初始化的时候传入一个Compare接口实现类,两种方式进行比较优先级
PriorityBlockingQueue<Task> queue = new PriorityBlockingQueue<Task>(); //因为是无界队列,初始化可以不定义长度
        Task t1 = new Task();
        t1.setId(1);
        t1.setName("任务 1");
        
        Task t2 = new Task();
        t2.setId(4);
        t2.setName("任务 2");
        
        Task t3 = new Task();
        t3.setId(3);
        t3.setName("任务 3");

        
        queue.add(t2);
        queue.add(t3);
        queue.add(t1);

        queue.take(); // 取出来最小的 t1 ,任务1

DelayQueue

  1. 内部有一个PriorityBlockingQueue,最先到期的元素放在堆顶。
  2. 里面的元素必须要实现Delayed接口
 入队等待线程
 DelayQueue< Student> students = new DelayQueue<Student>();
 students.put(new Student("student"+(i+1), 30+random.nextInt(120),countDownLatch));

 出队消费线程
 while(!Thread.interrupted()){
    students.take().run();
 }

参考资料

BlockQueue详细介绍
https://blog.csdn.net/qq_38872310/article/details/80832703

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,366评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,521评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 165,689评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,925评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,942评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,727评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,447评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,349评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,820评论 1 317
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,990评论 3 337
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,127评论 1 351
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,812评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,471评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,017评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,142评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,388评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,066评论 2 355