这个阻塞队列明明超强却过分慎重


这标题取的hhhh,以后基本都会是类似的标题,没办法,文章还是要正经一点写,只能在标题整下活了。一天不整活我就浑身难受!

我话说完,简单聊聊阻塞队列,谁赞成?谁反对?


我们假设一种场景,生产者一直生产资源,消费者一直消费资源,资源存储在一个缓冲池中,生产者将生产的资源存进缓冲池中,消费者从缓冲池中拿到资源进行消费,这就是大名鼎鼎的生产者-消费者模式。

我们自己coding实现这个模式的时候,因为需要让多个线程操作共享变量(即资源),所以很容易引发线程安全问题,造成重复消费和死锁,尤其是生产者和消费者存在多个的情况。另外,当缓冲池空了,我们需要阻塞消费者,唤醒生产者;当缓冲池满了,我们需要阻塞生产者,唤醒消费者,这些个等待-唤醒逻辑都需要自己实现。
这么容易出错的事情,JDK当然帮我们做啦,这就是阻塞队列(BlockingQueue),你只管往里面存、取就行,而不用担心多线程环境下存、取共享变量的线程安全问题。

接下来将使用阻塞队列实现类ArrayBlockingQueue进行源码分析。

ArrayBlockingQueue

成员变量

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    /** The queued items */
    final Object[] items;

    /** items index for next take, poll, peek or remove */
    int takeIndex;

    /** items index for next put, offer, or add */
    int putIndex;

    /** Number of elements in the queue */
    int count;

    /*
     * Concurrency control uses the classic two-condition algorithm
     * found in any textbook.
     */

    /** Main lock guarding all access */
    final ReentrantLock lock;

    /** Condition for waiting takes */
    private final Condition notEmpty;

    /** Condition for waiting puts */
    private final Condition notFull;
    
    ...

  • final Object[] items : 储存元素的数组
  • int takeIndex : 用于出队的指针
  • int putIndex : 用于入队的指针
  • int count : 队列中元素个数
  • final ReentrantLock lock : 可重入锁
  • private final Condition notEmpty : 消费者监视器
  • private final Condition notFull : 生产者监视器

构造方法

    /**
     * Creates an {@code ArrayBlockingQueue} with the given (fixed)
     * capacity and the specified access policy.
     *
     * @param capacity the capacity of this queue
     * @param fair if {@code true} then queue accesses for threads blocked
     *        on insertion or removal, are processed in FIFO order;
     *        if {@code false} the access order is unspecified.
     * @throws IllegalArgumentException if {@code capacity < 1}
     */
                                //容量        //是否为公平锁
    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }
    /**
     * Creates an {@code ArrayBlockingQueue} with the given (fixed)
     * capacity and default access policy.
     *
     * @param capacity the capacity of this queue
     * @throws IllegalArgumentException if {@code capacity < 1}
     */
                                //容量
    public ArrayBlockingQueue(int capacity) {
        //默认为非公平锁
        this(capacity, false);
    }
    /**
     * Creates an {@code ArrayBlockingQueue} with the given (fixed)
     * capacity, the specified access policy and initially containing the
     * elements of the given collection,
     * added in traversal order of the collection's iterator.
     *
     * @param capacity the capacity of this queue
     * @param fair if {@code true} then queue accesses for threads blocked
     *        on insertion or removal, are processed in FIFO order;
     *        if {@code false} the access order is unspecified.
     * @param c the collection of elements to initially contain
     * @throws IllegalArgumentException if {@code capacity} is less than
     *         {@code c.size()}, or less than 1.
     * @throws NullPointerException if the specified collection or any
     *         of its elements are null
     */
                                //容量        //是否为公平锁
    public ArrayBlockingQueue(int capacity, boolean fair,
                                //初始化阻塞队列应包含的元素的集合
                              Collection<? extends E> c) {
        this(capacity, fair);

        final ReentrantLock lock = this.lock;
        lock.lock(); // Lock only for visibility, not mutual exclusion
        try {
            int i = 0;
            try {
                for (E e : c) {
                    //是否为空,为空抛NPE
                    checkNotNull(e);
                    items[i++] = e;
                }
            } catch (ArrayIndexOutOfBoundsException ex) {
                throw new IllegalArgumentException();
            }
            count = i;
            putIndex = (i == capacity) ? 0 : i;
        } finally {
            lock.unlock();
        }
    }
  • 注意 :此构造方法中使用了锁,而jdk给出的注释是 此处加锁只是为了可见性,而非互斥性

这里简单说一下为什么需要保证可见性

对象创建不是一个原子性操作,对象创建的过程需分为以下三个步骤

  1. 分配内存空间;
  2. 初始化对象;
  3. 将内存空间的地址赋值给对应的引用。

当构造方法返回时即返回了新创建的对象地址,将其赋给相对应的引用,可理解为当构造方法返回时即完成了对象创建的第三个步骤。然而,为了使程序运行更加高效,编译器会对指令的执行顺序进行一定的优化,即指令重排序

当发生指令重排序时,可能发生步骤2和步骤3颠倒的情况,当出现该情况时,我们可以粗糙地理解为本应在构造方法中对成员变量进行初始化的代码跑到构造方法外边执行,也就是当构造方法返回时,即已经返回了内存空间的地址,然而此时构造方法中对成员变量的初始化工作还未开始

  • 注意:步骤1是不会发生重排序的,因为步骤2和步骤3都需要依赖步骤1的执行完成才能得以执行

虽然是这样,在单线程环境下是不会影响最终结果的,而在并发环境下往往就是一颗炸弹


我们举个并发环境中的例子:

我们使用A线程进行对象的创建,B线程操作A线程创建的阻塞队列,我们在A线程中调用了构造方法

若此时发生了指令重排序,就会出现A线程已经拿到了该阻塞队列对象的地址之后,该阻塞队列还未初始化完成的情况,此时B线程看见A线程已经创建好了对象,于是B线程开始了自己的工作,而此时阻塞队列还未初始化完成,就出现了并发安全问题

  • 注意 :被final修饰的变量不会发生重排序,故当this(capacity, fair)返回时,用final修饰的成员变量已全部初始化完成

使用锁的话能够防止指令重排序,因为ReentrantLock继承了AQSAQS底层使用了到了volatile关键字,volatile底层使用到了内存屏障以防止指令重排序,并且在解锁时能够将同步代码块中的变量值刷回主存,保证了可见性

核心方法

put 入队

    /**
     * Inserts the specified element at the tail of this queue, waiting
     * for space to become available if the queue is full.
     *
     * @throws InterruptedException {@inheritDoc}
     * @throws NullPointerException {@inheritDoc}
     */
    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        //获取锁,阻塞可被中断
        lock.lockInterruptibly();
        try {
            //若队列满
            while (count == items.length)
                //该线程作为生产者被挂起
                notFull.await();
            //入队
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }


    /**
     * Inserts element at current put position, advances, and signals.
     * Call only when holding lock.
     */
    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        //循环队列
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        //唤醒消费者线程
        notEmpty.signal();
    }

take 出队

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        //获取锁,阻塞可被中断
        lock.lockInterruptibly();
        try {
            //若队列为空
            while (count == 0)
                //该线程作为消费者挂起
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }


    /**
     * Extracts element at current take position, advances, and signals.
     * Call only when holding lock.
     */
    private E dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        //循环队列
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        //itrs同于与当前正活跃的迭代器共享状态,如果存在,通知其一起删除该元素
        if (itrs != null)
            itrs.elementDequeued();
        //唤醒生产者线程
        notFull.signal();
        return x;
    }

总的来说,ArrayBlockingQueue的源码相对简单,但在构造方法处的涉及了比较多的Java并发基础知识。还涉及了锁的一些基本知识,还有AQS...
并发难,好难,真的难。有时候一个知识点就能费你一个下午的时间,有时候好像自己想通了,转身又给忘了QAQ
有空再讲讲并发基础知识吧,volatile,synchronized,Java内存模型,锁,AQS...


以上!


参考文献:

深入浅出Java多线程
分析 ArrayBlockingQueue 构造函数加锁问题

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。