ArrayBlockingQueue源码分析

前面几天陆续写了关于ArrayList和LinkedList的源码分析,其实都比较简单,稍微有点代码基础的人都应该能看的非常明白。我更博客的过程也是自己逐渐学习的过程,所以希望随着博客的更新,在内容上难度也逐渐增大。近期的博客都是以列表为主,所以今天就来看看线程安全的ArrayBlockingQueue

ArrayBlockingQueue简单介绍

首先这个队列是FIFO的单向队列,头部的对象呆的时间最久,尾部的对象呆的时间最长。他与ArrayList的最大却别在于它是线程安全的,并且长度不会自动增加,它在存取的功能上更加丰富,支持阻塞和非阻塞的存取。通常我们会在初始化线程池的时候为了防止有的线程池等待队列的无限增长,我们会在初始化的时候指定一个长度一定的阻塞队列,然后指定一个任务处理策略,这就是它使用比较多的场景了。

源码分析

类定义

    /**
     * 没有一些其他的接口值得我们过多介绍,都是一些比较常规的方法抽象
     */
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable

属性分析

    //实际存放元素内容的数组
    final Object[] items;

    //队列中下一个被取出的值的位置下标
    int takeIndex;

    //队列中下一个存放元素的位置下标
    int putIndex;

    //实际元素数量
    int count;

    //内部通用锁
    final ReentrantLock lock;
    //take时候用于等待的条件
    private final Condition notEmpty;
    //put时候用于等待的条件
    private final Condition notFull;
    //这里插一句,Condition必须依赖于ReentrantLock,否则没有多大意义。它相对于Object.wait就是增加了更细粒度的控制条件,有机会的话后面会分析。

构造函数

    public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }
    //所以这个fair就是初始化内部的锁使用的,默认使用非公平锁
    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }
    /**
     *这个构造函数的核心就是c.size()与capacity的大小关系对比了
     *如果c.size()>capacity那就会报错,所以在初始化的时候要注意
     */
    public ArrayBlockingQueue(int capacity, boolean fair,
                              Collection<? extends E> c) {
        this(capacity, fair);
        //这种写法我们很常见,使用final表示引用不能改变,但又避免了直接使用成员变量
        final ReentrantLock lock = this.lock;
        lock.lock(); 
        try {
            int i = 0;
            try {
                for (E e : c) {//ArrayBlockingQueue不支持null元素噢
                    checkNotNull(e);
                    items[i++] = e;
                }
            //统一异常处理,但是我觉得这里是不是包装一下ex更为合理一些
            } catch (ArrayIndexOutOfBoundsException ex) {
                throw new IllegalArgumentException();
            }
            count = i;
            //如果数组全部被沾满的话就开始进入循环状态(数组长度就是队列深度)
            putIndex = (i == capacity) ? 0 : i;
        } finally {
            lock.unlock();
        }
    }

方法介绍

我们先看一下很多方法公用的私有方法,因为下面的很多public方法会对这些方法调用
    final int inc(int i) {//循环递增(源码里面经常出现一句话搞定这种写法)
        return (++i == items.length) ? 0 : i;
    }
    final int dec(int i) { //原理同上
        return ((i == 0) ? items.length : i) - 1;
    }
    //类型转换
    static <E> E cast(Object item) {
        return (E) item;
    }
    final E itemAt(int i) {
        return this.<E>cast(items[i]);
    }
    /**
     * 这个插入方法是所有公用插入方法的核心插入逻辑
     */
    private void insert(E x) {
        items[putIndex] = x;
        putIndex = inc(putIndex); //因为可能循环,所以不能直接putIndex++
        ++count;
        notEmpty.signal(); //有这个条件说明这个类在调用的时候一定要持有锁,否则这个notEmpty就没有用。插入之后要通知一下可能在put时候进入阻塞状态的线程
    }
    /**
     * 取出操作比插入操作多了类型转换这个操作(内部的数组是Obejct类型),这个也要求在调用时候持有锁对象
     */
    private E extract() {
        final Object[] items = this.items;
        E x = this.<E>cast(items[takeIndex]);
        items[takeIndex] = null;
        takeIndex = inc(takeIndex);
        --count;
        notFull.signal();
        return x;
    }
    /**
     *
     */
    void removeAt(int i) {
        final Object[] items = this.items;
        //如果要移除的元素恰好就是下一个出队的元素
        if (i == takeIndex) {
            items[takeIndex] = null;
            takeIndex = inc(takeIndex);
        } else {
            for (;;) {
                int nexti = inc(i);
                if (nexti != putIndex) {//讲i元素后面的元素全部前移一个位置(这里不用System.arraycopy的原因大家自己脑补)
                    items[i] = items[nexti];
                    i = nexti;
                } else { //如果要移除的元素是putIndex的前一个元素(说明移除不会造成元素移动)
                    items[i] = null;
                    putIndex = i;
                    break;
                }
            }
        }
        --count;
        notFull.signal();
    }
    
基础的内部核心方法就是上面这些了,接下来我们看看常用的public方法具体实现

---
    /**
     *add会在队列满的时候直接报错
     */
    public boolean add(E e) {
        return super.add(e);
    }
    public boolean add(E e) {
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }
    /**
     *这里我们就知道其实add调用的还是offer方法
     *offer在队列满的情况下会直接返回false,不会阻塞
     */
    public boolean offer(E e) {
        checkNotNull(e);//不能插入空元素
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {//如果队列满的话直接放回false
            if (count == items.length)
                return false;
            else {
                insert(e);
                return true;
            }
        } finally {
            lock.unlock();
        }
    }
    /**
     *put在队列满的情况下会直接阻塞,但是可以中断其阻塞
     */
    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();//可以响应中断(所以一旦put阻塞后可以调用interrupt来中断)
        try {
            while (count == items.length)
                notFull.await(); //阻塞!!!
            insert(e);
        } finally {
            lock.unlock();
        }
    }
    /**
     *offer(E,long,TimeUnit)会在等待一段时间后返回,但是等待的过程中是可以响应中断的
     */
    public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {

        checkNotNull(e);
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length) {
                if (nanos <= 0)
                    return false;
                nanos = notFull.awaitNanos(nanos);
            }
            insert(e);
            return true;
        } finally {
            lock.unlock();
        }
    }
    /**
     *pull方法在队列为空的情况下直接返回null,不会阻塞
     */
    public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return (count == 0) ? null : extract();
        } finally {
            lock.unlock();
        }
    }
    /**
     *take跟put相对应,在队列为空的情况下会阻塞,但是可以响应中断
     */
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return extract();
        } finally {
            lock.unlock();
        }
    }
    /**
     *指定等待时间的pull,可以等待指定时间,响应中断
     */
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0) {
                if (nanos <= 0)//即使nanos小于零,但是队列有元素返回的之后就直接返回了,不会考虑这个值
                    return null;
                nanos = notEmpty.awaitNanos(nanos);
            }
            return extract();
        } finally {
            lock.unlock();
        }
    }
    /**
     *peek=偷窥,没有就直接返回null,不会阻塞
     */
    public E peek() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return (count == 0) ? null : itemAt(takeIndex);
        } finally {
            lock.unlock();
        }
    }
    /**
     *为了保证线程安全,还是加了必要的同步操作
     */
    public int size() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return count;
        } finally {
            lock.unlock();
        }
    }
    public int remainingCapacity() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return items.length - count;
        } finally {
            lock.unlock();
        }
    }
    //contains(Object)跟remove方法一样,不用看了
    public boolean remove(Object o) {
        if (o == null) return false; //本身就不存储null元素
        final Object[] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {//这里可不敢i++,毕竟是抽象的循环数组,i控制元素下标,k控制具体数量
            for (int i = takeIndex, k = count; k > 0; i = inc(i), k--) {
                if (o.equals(items[i])) {
                    removeAt(i);
                    return true;
                }
            }
            return false;
        } finally {
            lock.unlock();
        }
    }
    
    /**
     *为了保证线程安全,还是加了必要的同步操作
     */
    public <T> T[] toArray(T[] a) {
        final Object[] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            final int count = this.count;
            final int len = a.length;
            if (len < count) //这里跟List里面的toArray注意地方一样
                a = (T[])java.lang.reflect.Array.newInstance(
                    a.getClass().getComponentType(), count);
            for (int i = takeIndex, k = 0; k < count; i = inc(i), k++)
                a[k] = (T) items[i];
            if (len > count)
                a[count] = null;//清除多余的引用
            return a;
        } finally {
            lock.unlock();
        }
    }
    
    public void clear() {
        final Object[] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            for (int i = takeIndex, k = count; k > 0; i = inc(i), k--)
                items[i] = null;
            count = 0;
            putIndex = 0;
            takeIndex = 0;
            notFull.signalAll(); //注意,清空就是大幅度的take操作一样,一定最后要signalAll()来唤醒等待在notFull上的线程
        } finally {
            lock.unlock();
        }
    }
    
    /**
     *将队列的元素全部输出到c里面(不能自己输到自己噢)
     */
    public int drainTo(Collection<? super E> c) {
        checkNotNull(c);
        if (c == this)
            throw new IllegalArgumentException();
        final Object[] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            int i = takeIndex;
            int n = 0;//代表输出成功的元素数量
            int max = count;
            while (n < max) {
                c.add(this.<E>cast(items[i]));
                items[i] = null; //清空自身数组
                i = inc(i);
                ++n;
            } //如果队列本省就是空的话就不会变动putIndex和tableIndex,否则全部置0
            if (n > 0) {
                count = 0;
                putIndex = 0;
                takeIndex = 0;
                notFull.signalAll();
            }
            return n;
        } finally {
            lock.unlock();
        }
    }
基本常用的方法都已经讲过了,之后再看下迭代操作的相关代码

    private class Itr implements Iterator<E> {}
    private int remaining; // 剩下要返回的元素数量
    private int nextIndex; // 下一个需要返回的元素下标
    private E nextItem;    // 下一个需要返回的元素
    private E lastItem;    // 最后一个调用next操作返回的元素
    private int lastRet;  //最后一个调用next操作返回的元素的下标
        
    Itr() {
        final ReentrantLock lock = ArrayBlockingQueue.this.lock;
        lock.lock();
        try {
            lastRet = -1;
            if ((remaining = count) > 0)//如果现在一个元素还没取
                nextItem = itemAt(nextIndex = takeIndex);
        } finally {
            lock.unlock();
        }
    }
    public boolean hasNext() {
            return remaining > 0;
    }
    public E next() {
        //跟外部公用一个锁
        final ReentrantLock lock = ArrayBlockingQueue.this.lock;
        lock.lock();
        try {
            if (remaining <= 0)
                throw new NoSuchElementException();
            lastRet = nextIndex;
            E x = itemAt(nextIndex);  //这里非常有意思,现在的x是比nextItem更新的值
            if (x == null) {
                x = nextItem;         //新值为空的话我就返回老值
                lastItem = null;      //确保移除上次的失败值
            }
            else
                lastItem = x;
            //这里进行跳过空值的操作,但是在上面操作的时候已经充分保证了没有空值插进来,为什么还要进一步预防Null呢?对这点保持疑问
            while (--remaining > 0 && 
               (nextItem = itemAt(nextIndex = inc(nextIndex))) == null);
            return x;
        } finally {
            lock.unlock();
        }
    }
    public void remove() {
        final ReentrantLock lock = ArrayBlockingQueue.this.lock;
        lock.lock();
        try {
            int i = lastRet;
            if (i == -1) //说明还没有调用next就直接remove
                throw new IllegalStateException();
            lastRet = -1;
            E x = lastItem; //如果lastItem没有调用过next()的情况下有可能是null
            lastItem = null;
            //只有在lastItem还在i位置时候才进行移除操作
            if (x != null && x == items[i]) {
                boolean removingHead = (i == takeIndex);
                removeAt(i);
                if (!removingHead) //如果不是移除头部nextIndex的话需要前移,但是整个数组对象是没有移动操作的
                    nextIndex = dec(nextIndex);
            }
        } finally {
            lock.unlock();
        }
    }

列表系列的源码分析还在继续。。。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,816评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,729评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,300评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,780评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,890评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,084评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,151评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,912评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,355评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,666评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,809评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,504评论 4 334
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,150评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,882评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,121评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,628评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,724评论 2 351

推荐阅读更多精彩内容