Java 多线程(九):ArrayBlockingQueue 与 LinkedBlockingQueue

什么是阻塞队列?

  • 阻塞队列与我们平常接触到的普通队列(ArrayList)的最大不同点在于阻塞队列的添加和删除方法都是阻塞的
    • 阻塞添加:当阻塞队列元素已满时,队列会阻塞加入元素的线程,直到队列元素不满时才重新唤醒线程执行元素加入操作
    • 阻塞删除:当队列元素为空时,删除队列元素的线程将被阻塞,直到队列不为空再执行删除操作

BlockingQueue

  • BlockingQueue 能够解决多线程中如何高效安全的传输数据的问题,帮助我们快速搭建高质量的多线程程序
  • 通过队列,可以使得数据由队列的一端输入,从另一端输出
  • 适用场景:生产者线程在一端生成,消费者线程在另一端消费
BlockingQueue

BlockingQueue 主要方法

  • 插入方法:
    • add(E e):
      • 将元素插入到队尾(如果立即可行且不会超过该队列的容量)
      • 成功返回 true,失败抛 IllegalStateException 异常
    • offer(E e,long timeout,TimeUnit unit):
      • 将元素插入到队尾(如果立即可行且不会超过该队列的容量)
      • 可设置超时时间,该方法可以中断
      • 成功返回 true,如果队列已满,返回 false
    • put(E e):
      • 将元素插入到队尾,如果队列已满则一直等待(阻塞)
  • 删除方法:
    • remove(Object o):
      • 移除指定元素,成功返回 true,失败返回 false
    • poll(long timeout, TimeUnit unit):
      • 获取并移除此队列的头元素,在指定等待的时间前一直等到获取元素
    • take():
      • 获取并移除队列头元素,若没有元素则一直阻塞
  • 检查方法:
    • element():
      • 获取但不移除队列的头元素,没有元素则抛异常
    • peek():
      • 获取但不移除队列的头,若队列为空则返回 null

BlockingQueue 基本使用

  • 该例子中使用 ArrayBlockingQueue,生产者(Producer)将字符串插入共享队列中,消费者将它们取出
public class BlockingQueueExample {
    public static void main(String[] args) throws Exception {
        BlockingQueue queue = new ArrayBlockingQueue(3);
        Producer producer = new Producer(queue);
        Consumer consumer = new Consumer(queue);
        new Thread(producer).start();
        new Thread(consumer).start();
        Thread.sleep(4000);
    }
}
  • 生产者通过 put() 方法将元素插入共享队列中
public class Producer implements Runnable {

    private BlockingQueue queue;

    public Producer(BlockingQueue queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            queue.put("1");
            Thread.sleep(1000);
            queue.put("2");
            Thread.sleep(1000);
            queue.put("3");
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}
  • 消费者通过 take() 方法从队列中取出元素,take() 方法会阻塞直到获取元素为止
public class Consumer implements Runnable {

    private BlockingQueue queue = null;

    public Consumer(BlockingQueue queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            System.out.println(queue.take());
            System.out.println(queue.take());
            System.out.println(queue.take());
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

BlockingQueue 接口实现类与源码分析

ArrayBlockingQueue

  • 分析:
    • 基于数组的阻塞队列实现,内部维护了一个数组用于缓存队列中的数据对象
    • 有两个 Integer 类型的索引,指向添加、获取下一个元素的位置的索引
    • 并通过一个重入锁 ReentrantLock 和两个 Condition 条件来实现阻塞
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    /** 存储数据的数组 */
    final Object[] items;

    /**获取数据的索引,主要用于take,poll,peek,remove方法 */
    int takeIndex;

    /**添加数据的索引,主要用于 put, offer, or add 方法*/
    int putIndex;

    /** 队列元素的个数 */
    int count;


    /** 控制并非访问的锁 */
    final ReentrantLock lock;

    /**notEmpty条件对象,用于通知take方法队列已有元素,可执行获取操作 */
    private final Condition notEmpty;

    /**notFull条件对象,用于通知put方法队列未满,可执行添加操作 */
    private final Condition notFull;

    /**
       迭代器
     */
    transient Itrs itrs = null;

}

  • 分析:
    • add 方法实际上是调用了 offer 方法
    • enqueue(E x) 方法内部通过 putIndex 索引直接将元素添加到数组 item 中,当 putIndex 索引大小等于数组长度时,需要将 putIndex 重新设置为 0,这是因为当前队列元素总是从队头获取,从队尾添加
//add方法实现,间接调用了offer(e)
public boolean add(E e) {
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }

//offer方法
public boolean offer(E e) {
     checkNotNull(e);//检查元素是否为null
     final ReentrantLock lock = this.lock;
     lock.lock();//加锁
     try {
         if (count == items.length)//判断队列是否满
             return false;
         else {
             enqueue(e);//添加元素到队列
             return true;
         }
     } finally {
         lock.unlock();
     }
 }

//入队操作
private void enqueue(E x) {
    //获取当前数组
    final Object[] items = this.items;
    //通过putIndex索引对数组进行赋值
    items[putIndex] = x;
    //索引自增,如果已是最后一个位置,重新设置 putIndex = 0;
    if (++putIndex == items.length)
        putIndex = 0;
    count++;//队列中元素数量加1
    //唤醒调用take()方法的线程,执行元素获取操作。
    notEmpty.signal();
}

  • 分析:
    • put 方法是一个阻塞方法,如果队列元素已满,那么当前线程将会被 notFull 条件对象挂起加入到等待队列中,直到有空挡才会唤醒执行添加操作
//put方法,阻塞时可中断
 public void put(E e) throws InterruptedException {
     checkNotNull(e);
      final ReentrantLock lock = this.lock;
      lock.lockInterruptibly();//该方法可中断
      try {
          //当队列元素个数与数组长度相等时,无法添加元素
          while (count == items.length)
              //将当前调用线程挂起,添加到notFull条件队列中等待唤醒
              notFull.await();
          enqueue(e);//如果队列没有满直接添加。。
      } finally {
          lock.unlock();
      }
  }

LinkedBlockingQueue

  • 基于链表的阻塞队列,内部维护着一个链表构成的缓冲队列,用于缓存队列中的数据对象
  • 在正常情况下链表阻塞队列的吞吐量要高于数组的阻塞队列(ArrayBlockingQueue),因为其内部实现添加和删除操作使用了两个 ReentrantLock 来控制并发执行(插入、获取各有一个锁),而 ArrayBlockingQueue 内部只使用一个 ReentrantLock 控制并发
  • 它与 ArrayBlockingQueue 的 API 几乎一致但内部实现原理不太相同
  • 当创建一个 LinkedBlockingQueue 时,默认阻塞队列中元素的数量大小为 Interger.MAX_VALUE

LinkedBlockingQueue 和 ArrayBlockingQueue 的区别

  • 队列大小有所不同:ArrayBlockingQueue 必须指定队列大小,而 LinkedBlockingQueue 默认为 Integer.MAX_VALUE(当元素添加速度大于移除速度时,需要注意一下,以免内存溢出)
  • 实现结构不同:ArrayBlockingQueue 采用数组实现、而 LinkedBlockingQueue 采用链表实现
  • 由于 ArrayBlockingQueue 采用数组存储队列元素,因此再插入、删除元素时不会产生或销毁任何额外的对象实例,而 LinkedBlockingQueue 每次插入都会生成一个新的结点(Node)对象,这会影响日后 GC 垃圾回收
  • ArrayBlockingQueue 中添加、删除操作只使用一个锁(ReentrantLock),而 LinkedBlockingQueue 添加、删除操作各使用一个锁,因此 LinkedBlockingQueue 的并发吞吐量大于 ArrayBlockingQueue
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,451评论 6 506
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,172评论 3 394
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,782评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,709评论 1 294
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,733评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,578评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,320评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,241评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,686评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,878评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,992评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,715评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,336评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,912评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,040评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,173评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,947评论 2 355

推荐阅读更多精彩内容