以LinkedBlockingQueue为例浅谈阻塞队列的实现

目录

最近在阅读Spark源码的过程中,又重新接触到了一些Java并发方面的知识,于是就见缝插针地将它们记录下来,当做复习与备忘。

阻塞队列简介

阻塞队列的定义

根据Doug Lea在JavaDoc中的解释,所谓阻塞队列,就是在普通队列的基础之上,支持以下两种操作的队列:

  • 当某线程从队列获取元素时,如果队列为空,就等待(阻塞)直至队列中有元素;
  • 当某线程向队列插入元素时,如果队列已满,就等待(阻塞)直至队列中有空间。

也就是说,阻塞队列是自带同步机制的队列。它最常用来解决线程同步中经典的生产者-消费者问题,前面讲过的Spark Core异步事件总线中,就采用阻塞队列作为事件存储。

Java中的阻塞队列

Java中阻塞队列的基类是j.u.c.BlockingQueue接口,它继承自Queue接口,并且定义了额外的方法实现同步:

    void put(E e) throws InterruptedException;

    boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;

    E take() throws InterruptedException;

    E poll(long timeout, TimeUnit unit) throws InterruptedException;

上述put()与offer()方法用于向队列插入元素,take()与poll()方法则是从队列获取元素。不同的是,put()与take()方法在插入/获取时,如果必须等待,就会一直阻塞下去;而offer()与poll()方法可以指定阻塞的时间长度。

以BlockingQueue接口为中心的继承关系如下图所示。


平时开发中比较常用的阻塞队列是基于数组实现的ArrayBlockingQueue,与基于单链表实现的LinkedBlockingQueue。本文选择后者来深入看一下阻塞队列的实现细节,因为它的性能在多数情况下更优,可以自行写benchmark程序来测测。

LinkedBlockingQueue

LinkedBlockingQueue(以下简称LBQ)是基于单链表实现的,先进先出(FIFO)的有界阻塞队列。

单链表定义

LBQ的单链表结点数据结构定义在静态内部类Node中。

    static class Node<E> {
        E item;
        Node<E> next;
        Node(E x) { item = x; }
    }

在类的内部还定义了单链表的头结点与尾结点。

    transient Node<E> head;
    private transient Node<E> last;

head始终指向链表的第一个结点,该结点是哨兵结点,不存储数据,只标记链表的开始,即head.item == null。这样可以避免只有一个结点时造成混乱。
tail始终指向链表的最后一个结点,该结点是有数据的,并满足last.next == null

LBQ在队头获取及弹出元素,在队尾插入元素。

锁和等待队列

LBQ采用双锁机制保证入队和出队可以同时进行,互不干扰。

    private final ReentrantLock takeLock = new ReentrantLock();
    private final Condition notEmpty = takeLock.newCondition();
    private final ReentrantLock putLock = new ReentrantLock();
    private final Condition notFull = putLock.newCondition();

可见定义有两个ReentrantLock,takeLock用于控制出队,putLock用于控制入队。另外,还有这两个锁分别对应的条件变量notEmpty和notFull,分别维护出队和入队线程的等待队列。ReentrantLock和Condition都是Java AQS机制的重要组成部分,之后也会细说。

值得注意的是,在某些方法中需要同时对takeLock与putLock加锁与解锁,所以LBQ内部也提供了这样的方法。

    void fullyLock() {
        putLock.lock();
        takeLock.lock();
    }

    void fullyUnlock() {
        takeLock.unlock();
        putLock.unlock();
    }

这两个方法总会成对调用,保证所有需要同时加锁和解锁的地方,其顺序都一致并且不可中断,也防止了前一个锁操作成功执行,后一个锁操作被打断导致死锁的风险。

另外,LBQ也对条件变量的Condition.signal()方法进行了简单封装,分别用来唤醒阻塞的出队操作线程和入队操作线程。

    private void signalNotEmpty() {
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
    }

    private void signalNotFull() {
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            notFull.signal();
        } finally {
            putLock.unlock();
        }
    }

容量和计数

    private final int capacity;
    private final AtomicInteger count = new AtomicInteger();

capacity是LBQ的最大容量,可以在构造方法中随同名参数传入,默认值是Integer.MAX_VALUE。

    public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }

    public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
    }

count则是LBQ内当前元素的计数,由于入队和出队动作可以并发执行,所以要用原子类型AtomicInteger保证线程安全。

入队操作

由于put()和offer()方法的逻辑基本相同,所以只看offer()方法就好了。

    public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {

        if (e == null) throw new NullPointerException();
        long nanos = unit.toNanos(timeout);
        int c = -1;
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            while (count.get() == capacity) {
                if (nanos <= 0)
                    return false;
                nanos = notFull.awaitNanos(nanos);
            }
            enqueue(new Node<E>(e));
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
        return true;
    }

    private void enqueue(Node<E> node) {
        // assert putLock.isHeldByCurrentThread();
        // assert last.next == null;
        last = last.next = node;
    }

在入队时,首先将putLock加锁,然后用卫语句count.get() == capacity判断队列是否已满,若已满,则进入等待循环。当阻塞的时间超时后,判定入队操作失败,并返回false。
如果队列未满,或者在超时时间未到时有了空间,就调用enqueue()方法在队尾插入元素,并将计数器自增。入队后若还有更多的剩余空间,则唤醒其他等待的入队线程。
最后将putLock解锁,并检查由count.getAndIncrement()返回的值是否为0。如果为0,表示队列刚刚由空变为非空状态,因此也要唤醒等待的出队线程。

出队操作

同理,只看poll()方法。

    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        E x = null;
        int c = -1;
        long nanos = unit.toNanos(timeout);
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            while (count.get() == 0) {
                if (nanos <= 0)
                    return null;
                nanos = notEmpty.awaitNanos(nanos);
            }
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }

    private E dequeue() {
        // assert takeLock.isHeldByCurrentThread();
        // assert head.item == null;
        Node<E> h = head;
        Node<E> first = h.next;
        h.next = h; // help GC
        head = first;
        E x = first.item;
        first.item = null;
        return x;
    }

将讲解入队方法时的描述反着说一遍就行了:
在出队时,首先将takeLock加锁,然后用卫语句count.get() == 0判断队列是否为空,若为空,则进入等待循环。当阻塞的时间超时后,判定出队操作失败,并返回false。
如果队列不为空,或者在超时时间未到时进了新元素,就调用dequeue()方法弹出队头元素,并将计数器自减。出队后若还有更多的剩余元素,则唤醒其他等待的出队线程。
最后将takeLock解锁,并检查由count.getAndDecrement()返回的值是否为capacity。如果为capacity,表示队列刚刚由满变为不满状态,因此也要唤醒等待的入队线程。

需要操作双锁的情况

以remove()方法为例。

    public boolean remove(Object o) {
        if (o == null) return false;
        fullyLock();
        try {
            for (Node<E> trail = head, p = trail.next;
                 p != null;
                 trail = p, p = p.next) {
                if (o.equals(p.item)) {
                    unlink(p, trail);
                    return true;
                }
            }
            return false;
        } finally {
            fullyUnlock();
        }
    }

    void unlink(Node<E> p, Node<E> trail) {
        // assert isFullyLocked();
        // p.next is not changed, to allow iterators that are
        // traversing p to maintain their weak-consistency guarantee.
        p.item = null;
        trail.next = p.next;
        if (last == p)
            last = trail;
        if (count.getAndDecrement() == capacity)
            notFull.signal();
    }

由于单链表删除结点涉及到对链表的遍历,以及对前驱和后继结点的断链和补链,因此必须将两个锁都加上,禁止一切修改。待删除成功后才能解锁,继续正常的入队和出队操作。

生产者-消费者问题示例

生产者-消费者问题的解决方法用操作系统理论中的信号量PV(wait-signal)原语描述如下:

semaphore filled = 0;
semaphore empty = BUF_CAPACITY;
mutex_semaphore mutex = 1;

procedure producer() {
  while (true) {
    item = produce();
    wait(empty);
    wait(mutex);
    buffer.put(item);
    signal(mutex);
    signal(filled);
  }
}

procedure consumer() {
  while (true) {
    wait(filled);
    wait(mutex);
    item = buffer.get();
    signal(mutex);
    signal(empty);
    consume(item);
  }
}

利用阻塞队列可以免去自己实现同步机制的麻烦,从而非常方便地实现。一个极简的示例如下:

public class ProducerConsumerExample {
    private static final int BUF_CAPACITY = 16;

    public static void main(String[] args) {
        BlockingQueue<Long> blockingQueue = new LinkedBlockingQueue<>(BUF_CAPACITY);

        Thread producerThread = new Thread(() -> {
            try {
                while (true) {
                    long value = System.currentTimeMillis() % 1000;
                    blockingQueue.put(value);
                    Thread.sleep(value);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "producer");

        Thread consumerThread = new Thread(() -> {
            try {
                while (true) {
                    System.out.println(blockingQueue.take());
                    Thread.sleep(System.currentTimeMillis() % 1000);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "consumer");

        producerThread.start();
        consumerThread.start();
    }
}

一个小(?)问题

在上面的代码(以及j.u.c包中很多类的代码)的方法体中,经常能看到类似以下的语句:

        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;

也就是有些类中定义的字段,在方法中使用时会先赋值给一个局部变量。这样做到底是为了什么?以目前我所了解到的而言,还没有特别确切的答案,但可以确定是一个非常微小的优化,与JVM及缓存有关。

以下是reference传送门:

顺便,StackOverflow最近(不知道是哪一天)改版成了1998年的样式,满满的怀旧感。上面concurrency-interest邮件列表中关于这个问题也是众说纷纭,如果仔细爬楼还会发现Doug Lea本人的回复,不过有些令人费解。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,992评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,212评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,535评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,197评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,310评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,383评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,409评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,191评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,621评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,910评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,084评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,763评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,403评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,083评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,318评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,946评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,967评论 2 351