多线程之阻塞队列

阻塞队列

BlockingQueue

队列主要有两种:FIFO(先进先出)、LIFO(后进先出)。
再多线程环境中,队列很容实现数据共享,我们常用的"生产者"、"消费者"模型就可以通过队列来传递数据达到数据共享。但是现实中,大多数情况都是生产者产生消息的速度和消费的速度是不匹配的,就需要相应的对生产或者消费进行阻塞。当生产的消息积累到一定程度时,就需要对生产者就行阻塞,以便消费者将积累的消息进行消费。在concurrent包发布以前,在多线程环境下,我们每个程序员都必须去自己控制这些细节,尤其还要兼顾效率和线程安全。BlockingQueue释放了我们的双手,他让我们不用关系什么时候去阻塞,什么时候去唤醒线程。

抛异常 返回false 阻塞 超时,抛异常
插入 add offer put offer(timeout)
移除 take remove poll(timeout)
检查 contains

操作方法:

//--------添加 ----------
boolean add(E e);        //添加元素,加不了抛异常
boolean offer(E e);      //添加元素,加不了返回false
void put(E e) throws InterruptedException;  //添加元素,加不了一直阻塞
boolean offer(E e, long timeout, TimeUnit unit)
       throws InterruptedException; //添加元素,达到指定时间没有加入抛异常
// -------移除-----------
boolean remove(Object o);
E poll(long timeout, TimeUnit unit)
       throws InterruptedException;
E take() throws InterruptedException;
// ------

常见的BlockingQueue

有界性 数据结构
ArrayBlockingQueue bounded 加锁 ArrayList
LinkedBlockingQueue optionally-bounded 加锁 LinkedList
DelayQueue unbounded 加锁 heap
PriorityBlockingQueue unbounded 加锁 heap
SynchronousQueue bounded 加锁

1. ArrayBlockingQueue

基于数组实现的有界阻塞安全线程队列。
构造函数

public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }
public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        //初始化重入锁
        lock = new ReentrantLock(fair);
        //初始化读、写等待队列
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }
public ArrayBlockingQueue(int capacity, boolean fair,
                              Collection<? extends E> c) {
       //初始化构造器
        this(capacity, fair);
        //获取重入锁
        final ReentrantLock lock = this.lock;
        lock.lock(); // Lock only for visibility, not mutual exclusion
        try {
            int i = 0;
            try {
                for (E e : c) {
                    checkNotNull(e);
                    items[i++] = e;
                }
            } catch (ArrayIndexOutOfBoundsException ex) {
                throw new IllegalArgumentException();
            }
           //初始化元素中的个数
            count = i;
           //插入元素的下标索引
            putIndex = (i == capacity) ? 0 : i;
        } finally {
           //释放锁
            lock.unlock();
        }
    }

相关属性

final Object[] items;        //存放元素的数组
int takeIndex;                 //取元素的下标索引
int putIndex;                  //存元素的下标索引
int count;                      //数组中的元素个数
final ReentrantLock lock;   //数据读取的可重入锁
private final Condition notEmpty; //读等待的队列
private final Condition notFull;    //写等待的队列

核心函数
put

public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
       //如果当前线程没有中断,就将当前线程锁定
        lock.lockInterruptibly();
        try {
            //当前队列已经满了就一直等待
            while (count == items.length)
                notFull.await();
            //插入元素
            enqueue(e);
        } finally {
           //释放锁
            lock.unlock();
        }
    }

take

public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

2. LinkedBlockingQueue

基于链表实现的阻塞队列
构造函数

public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }
public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
    }
public LinkedBlockingQueue(Collection<? extends E> c) {
        this(Integer.MAX_VALUE);
        final ReentrantLock putLock = this.putLock;
        putLock.lock(); // Never contended, but necessary for visibility
        try {
            int n = 0;
            for (E e : c) {
                if (e == null)
                    throw new NullPointerException();
                if (n == capacity)
                    throw new IllegalStateException("Queue full");
                enqueue(new Node<E>(e));
                ++n;
            }
            count.set(n);
        } finally {
            putLock.unlock();
        }
    }

相关属性

private final int capacity;           //元素个数
private final AtomicInteger count = new AtomicInteger();  //
transient Node<E> head;             //头节点
private transient Node<E> last;     //尾节点
private final ReentrantLock takeLock = new ReentrantLock();  //读可重入锁
private final Condition notEmpty = takeLock.newCondition();  //读等待队列
private final ReentrantLock putLock = new ReentrantLock(); //写可重入锁
private final Condition notFull = putLock.newCondition();  //写队列

核心函数
put

public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        // Note: convention in all put/take/etc is to preset local var
        // holding count negative to indicate failure unless set.
        int c = -1;
        Node<E> node = new Node<E>(e);
       //获取写的可重入锁
        final ReentrantLock putLock = this.putLock;
        //线程安全的原子操作类
        final AtomicInteger count = this.count;
        //判断线程是否中断
        putLock.lockInterruptibly();
        try {
            /*
             * Note that count is used in wait guard even though it is
             * not protected by lock. This works because count can
             * only decrease at this point (all other puts are shut
             * out by lock), and we (or some other waiting put) are
             * signalled if it ever changes from capacity. Similarly
             * for all other uses of count in other wait guards.
             */
            while (count.get() == capacity) {
                notFull.await();
            }
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                //如果加1后还小于当前容量,则唤醒一个等待的线程
                notFull.signal();
        } finally {
            //释放锁
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
    }
3. DelayQueue

DelayQueue每次都是将元素加入排序队列,以delay/过期时间为排序因素,将快过期的元素放在队首,取数据的时候每次都是先取快过期的元素。
构造方法

public DelayQueue() {}
public DelayQueue(Collection<? extends E> c) {
        this.addAll(c);
    }

相关属性

private final transient ReentrantLock lock = new ReentrantLock();
private final PriorityQueue<E> q = new PriorityQueue<E>();  //根据队列里某些元素排序的有序队列
private final Condition available = lock.newCondition();
private Thread leader = null;

核心函数
offer

public boolean offer(E e) {
        //获取可重入锁
        final ReentrantLock lock = this.lock;
        //加锁
        lock.lock();
        try {
            //将元素加入优先级队列中
            q.offer(e);
           //如果当前元素为队首,将leader=null,唤起其他线程
            if (q.peek() == e) {
                leader = null;
                available.signal();
            }
            return true;
        } finally {
           //释放锁
            lock.unlock();
        }
    }

take

public E take() throws InterruptedException {
       //获取可重入锁
        final ReentrantLock lock = this.lock;
        //判断当前线程是否中断,没有中断就将当前线程锁定
        lock.lockInterruptibly();
        try {
            //循环执行
            for (;;) {
                E first = q.peek();
                //如果队首为空,阻塞当前线程
                if (first == null)
                    available.await();
                else {
                    //获取当前元素过期时间
                    long delay = first.getDelay(NANOSECONDS);
                    //小于等于0 直接弹出
                    if (delay <= 0)
                        return q.poll();
                     //将first 只为null,避免内存泄漏
                    first = null; // don't retain ref while waiting
                    if (leader != null)
                        //阻塞当前线程
                        available.await();
                    else {
                        //将当前线程赋值给leader,然后阻塞delay时间,等待队首元素达到可出队时间
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            available.awaitNanos(delay);
                        } finally {
                             //释放leader引用
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            //如果leader元素为空,优先级队列不为空唤起其他线程
            if (leader == null && q.peek() != null)
                available.signal();
            //释放锁
            lock.unlock();
        }
    }
4. PriorityBlockingQueue

无界优先队列
构造函数

 public PriorityBlockingQueue() {
        this(DEFAULT_INITIAL_CAPACITY, null);
    }
public PriorityBlockingQueue(int initialCapacity) {
        this(initialCapacity, null);
    }
public PriorityBlockingQueue(int initialCapacity,
                                 Comparator<? super E> comparator) {
        if (initialCapacity < 1)
            throw new IllegalArgumentException();
        this.lock = new ReentrantLock();
        this.notEmpty = lock.newCondition();
        this.comparator = comparator;
        this.queue = new Object[initialCapacity];
    }
public PriorityBlockingQueue(Collection<? extends E> c) {
        this.lock = new ReentrantLock();
        this.notEmpty = lock.newCondition();
        boolean heapify = true; // true if not known to be in heap order
        boolean screen = true;  // true if must screen for nulls
        if (c instanceof SortedSet<?>) {
            SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
            this.comparator = (Comparator<? super E>) ss.comparator();
            heapify = false;
        }
        else if (c instanceof PriorityBlockingQueue<?>) {
            PriorityBlockingQueue<? extends E> pq =
                (PriorityBlockingQueue<? extends E>) c;
            this.comparator = (Comparator<? super E>) pq.comparator();
            screen = false;
            if (pq.getClass() == PriorityBlockingQueue.class) // exact match
                heapify = false;
        }
        Object[] a = c.toArray();
        int n = a.length;
        // If c.toArray incorrectly doesn't return Object[], copy it.
        if (a.getClass() != Object[].class)
            a = Arrays.copyOf(a, n, Object[].class);
        if (screen && (n == 1 || this.comparator != null)) {
            for (int i = 0; i < n; ++i)
                if (a[i] == null)
                    throw new NullPointerException();
        }
        this.queue = a;
        this.size = n;
        if (heapify)
            heapify();
    }

相关属性

private static final int DEFAULT_INITIAL_CAPACITY = 11;   //默认容量
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; //最大容量
private transient Object[] queue;   //存放元素数组
private transient int size;  //元素个数
private transient Comparator<? super E> comparator;  //比较器
private final ReentrantLock lock;  //可重入锁
private final Condition notEmpty;  //非空条件
private transient volatile int allocationSpinLock; //扩容时,CAS更新这个值谁更新成功谁执行
private PriorityQueue<E> q;//不阻塞的优先队列,用于序列化/反序列化

核心函数
offer

public boolean offer(E e) {
        if (e == null)
            throw new NullPointerException();
        final ReentrantLock lock = this.lock;
        lock.lock();
        int n, cap;
        Object[] array;
         //判断是否需要扩容
        while ((n = size) >= (cap = (array = queue).length))
            tryGrow(array, cap);
        try {
            //根据是否有比较器选择不同方法
            Comparator<? super E> cmp = comparator;
            if (cmp == null)
                siftUpComparable(n, e, array);
            else
                siftUpUsingComparator(n, e, array, cmp);
            size = n + 1;
            //唤醒notEmpty条件
            notEmpty.signal();
        } finally {
           //释放锁
            lock.unlock();
        }
        return true;
    }
private static <T> void siftUpComparable(int k, T x, Object[] array) {
    Comparable<? super T> key = (Comparable<? super T>) x;
    while (k > 0) {
        // 取父节点
        int parent = (k - 1) >>> 1;
        // 父节点的元素值
        Object e = array[parent];
        // 如果key大于父节点,堆化结束
        if (key.compareTo((T) e) >= 0)
            break;
        // 否则,交换二者的位置,继续下一轮比较
        array[k] = e;
        k = parent;
    }
    // 找到了应该放的位置,放入元素
    array[k] = key;
}

take

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    // 加锁
    lock.lockInterruptibly();
    E result;
    try {
        // 队列没有元素,就阻塞在notEmpty条件上
        // 出队成功,就跳出这个循环
        while ( (result = dequeue()) == null)
            notEmpty.await();
    } finally {
        // 解锁
        lock.unlock();
    }
    // 返回出队的元素
    return result;
}
private E dequeue() {
    // 元素个数减1
    int n = size - 1;
    if (n < 0)
        // 数组元素不足,返回null
        return null;
    else {
        Object[] array = queue;
        // 弹出堆顶元素
        E result = (E) array[0];
        // 把堆尾元素拿到堆顶
        E x = (E) array[n];
        array[n] = null;
        Comparator<? super E> cmp = comparator;
        // 并做自上而下的堆化
        if (cmp == null)
            siftDownComparable(0, x, array, n);
        else
            siftDownUsingComparator(0, x, array, n, cmp);
        // 修改size
        size = n;
        // 返回出队的元素
        return result;
    }
}
private static <T> void siftDownComparable(int k, T x, Object[] array,
                                           int n) {
    if (n > 0) {
        Comparable<? super T> key = (Comparable<? super T>)x;
        int half = n >>> 1;           // loop while a non-leaf
        // 只需要遍历到叶子节点就够了
        while (k < half) {
            // 左子节点
            int child = (k << 1) + 1; // assume left child is least
            // 左子节点的值
            Object c = array[child];
            // 右子节点
            int right = child + 1;
            // 取左右子节点中最小的值
            if (right < n &&
                ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
                c = array[child = right];
            // key如果比左右子节点都小,则堆化结束
            if (key.compareTo((T) c) <= 0)
                break;
            // 否则,交换key与左右子节点中最小的节点的位置
            array[k] = c;
            k = child;
        }
        // 找到了放元素的位置,放置元素
        array[k] = key;
    }
}
5. SynchronousQueue

双栈双队列算法,一个写SynchronousQueue需要和一个读SynchronousQueue组队出现
构造方法

public SynchronousQueue() {
        this(false);
    }
public SynchronousQueue(boolean fair) {
        transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
    }

相关属性

static final int NCPUS = Runtime.getRuntime().availableProcessors();
static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32;
static final int maxUntimedSpins = maxTimedSpins * 16;
static final long spinForTimeoutThreshold = 1000L;
private ReentrantLock qlock;
private WaitQueue waitingProducers;
private WaitQueue waitingConsumers;

核心方法
put

public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        if (transferer.transfer(e, false, 0) == null) {
            Thread.interrupted();
            throw new InterruptedException();
        }
    }
  E transfer(E e, boolean timed, long nanos) {
            /* Basic algorithm is to loop trying to take either of
             * two actions:
             *
             * 1. If queue apparently empty or holding same-mode nodes,
             *    try to add node to queue of waiters, wait to be
             *    fulfilled (or cancelled) and return matching item.
             *
             * 2. If queue apparently contains waiting items, and this
             *    call is of complementary mode, try to fulfill by CAS'ing
             *    item field of waiting node and dequeuing it, and then
             *    returning matching item.
             *
             * In each case, along the way, check for and try to help
             * advance head and tail on behalf of other stalled/slow
             * threads.
             *
             * The loop starts off with a null check guarding against
             * seeing uninitialized head or tail values. This never
             * happens in current SynchronousQueue, but could if
             * callers held non-volatile/final ref to the
             * transferer. The check is here anyway because it places
             * null checks at top of loop, which is usually faster
             * than having them implicitly interspersed.
             */

            QNode s = null; // constructed/reused as needed
            boolean isData = (e != null);

            for (;;) {
                QNode t = tail;
                QNode h = head;
                if (t == null || h == null)         // saw uninitialized value
                    continue;                       // spin

                if (h == t || t.isData == isData) { // empty or same-mode
                    QNode tn = t.next;
                    if (t != tail)                  // inconsistent read
                        continue;
                    if (tn != null) {               // lagging tail
                        advanceTail(t, tn);
                        continue;
                    }
                    if (timed && nanos <= 0)        // can't wait
                        return null;
                    if (s == null)
                        s = new QNode(e, isData);
                    if (!t.casNext(null, s))        // failed to link in
                        continue;

                    advanceTail(t, s);              // swing tail and wait
                    Object x = awaitFulfill(s, e, timed, nanos);
                    if (x == s) {                   // wait was cancelled
                        clean(t, s);
                        return null;
                    }

                    if (!s.isOffList()) {           // not already unlinked
                        advanceHead(t, s);          // unlink if head
                        if (x != null)              // and forget fields
                            s.item = s;
                        s.waiter = null;
                    }
                    return (x != null) ? (E)x : e;

                } else {                            // complementary-mode
                    QNode m = h.next;               // node to fulfill
                    if (t != tail || m == null || h != head)
                        continue;                   // inconsistent read

                    Object x = m.item;
                    if (isData == (x != null) ||    // m already fulfilled
                        x == m ||                   // m cancelled
                        !m.casItem(x, e)) {         // lost CAS
                        advanceHead(h, m);          // dequeue and retry
                        continue;
                    }

                    advanceHead(h, m);              // successfully fulfilled
                    LockSupport.unpark(m.waiter);
                    return (x != null) ? (E)x : e;
                }
            }
        }

take

 public E take() throws InterruptedException {
        E e = transferer.transfer(null, false, 0);
        if (e != null)
            return e;
        Thread.interrupted();
        throw new InterruptedException();
    }
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,029评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,238评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,576评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,214评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,324评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,392评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,416评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,196评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,631评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,919评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,090评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,767评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,410评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,090评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,328评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,952评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,979评论 2 351

推荐阅读更多精彩内容