阻塞队列

身份越来越多,自己越来越少。 — 《一念天堂》

写在前面

阻塞队列常用于生产者和消费者的场景,生产者就是往队列中放入元素,消费者就是从队列中获取元素,阻塞队列就是生产者存放元素的容器,而消费者也从该容器中拿元素。

阻塞队列有两种常见的阻塞场景,满足这两种阻塞场景的队列就是阻塞队列,分别如下:

  • 当队列中没有元素的情况下,消费者端的所有线程会被自动阻塞,直到生产者往队列中放入元素,线程会被自动唤醒。
  • 当队列中元素填满的情况下,生产者端的所有线程会被自动阻塞,直到消费者从队列中获取元素,线程会被自动唤醒。

Java中的阻塞队列

Java中提供了7个阻塞队列,分别如下:

  • ArrayBlockingQueue:由数组结构组成的有界阻塞队列,按照先进先出的原则对元素进行排序,支持公平锁和非公平锁。
  • LinkedBlockingQueue:由链表结构组成的有界阻塞队列,按照先进先出的原则对元素进行排序,默认长度为Integer.MAX_VALUE。
  • PriorityBlockingQueue:支持优先级排序的无界阻塞队列,默认自然序对元素进行排序,可以自定义实现compareTo()方法指定排序规则,不保证同优先级元素的顺序。
  • DelayedQueue:使用优先级队列实现的无界阻塞队列,在创建元素时,可以指定多久才能从队列中获取元素,只有延时期满后才能从队列中获取元素。
  • SynchronousQueue:不存储元素的阻塞队列,每一个put操作都要等待take操作,否则不能添加元素,支持公平锁和非公平锁。
  • LinkedTransferQueue:由链表结构组成的无界阻塞队列,相当于其他队列,多了transfer和tryTransfer方法。
  • LinkedBlockingDeque:由链表结构组成的双向阻塞队列,队首和队尾都可以添加和移除元素,多线程并发时,可以将锁的竞争最多降到一半。

ArrayBlockingQueue和LinkedBlockingQueue一般为常用的阻塞队列。

阻塞队列的使用

接下来通过一个Demo演示阻塞队列的用法。

public class MainActivity extends AppCompatActivity {

    private static final String TAG = MainActivity.class.getSimpleName();

    private ArrayBlockingQueue<String> mBlockingQueue = new ArrayBlockingQueue<>(10);

    private Producer mProducer;
    private Consumer mConsumer;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);

        mProducer = new Producer();
        mProducer.start();
        mConsumer = new Consumer();
        mConsumer.start();
    }

    @Override
    protected void onStop() {
        super.onStop();
        mProducer.stopProduct();
        mConsumer.stopConsume();
    }

    /**
     * 生产者
     */
    private class Producer extends Thread {

        private volatile boolean isStop;

        private int event;

        public void stopProduct() {
            isStop = true;
        }

        @Override
        public void run() {
            super.run();
            while (!isStop) {
                try {
                    // 事件 - 5 发送完成后,睡2秒
                    if (event == 5) {
                        Thread.sleep(2000);
                    }
                    // 事件 - 10 发送完成后,调用stopProduct()
                    if (event == 10) {
                        stopProduct();
                    }
                    event++;
                    // 队列中没有空余位置,生产者端的线程进入阻塞状态,直到消费者端的线程从队列中拿元素,唤醒生产者端的线程继续执行
                    mBlockingQueue.put("事件 - " + event);
                    Log.d(TAG, "生产者 生产事件 = " + event);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    /**
     * 消费者
     */
    private class Consumer extends Thread {

        private volatile boolean isStop;

        public void stopConsume() {
            isStop = true;
        }

        @Override
        public void run() {
            super.run();
            while (!isStop) {
                try {
                    // 拿不到元素消费者端的线程进入阻塞状态,直到生产者端的线程往队列中放入元素,唤醒消费者端的线程继续执行
                    String event = mBlockingQueue.take();
                    Log.d(TAG, "消费者 消费事件 = " + event);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

这里维护了一个ArrayBlockingQueue,并指定其大小为10,创建了一个生产者线程和一个消费者线程,生产者线程在生产5个事件后睡两秒钟,消费者线程在消费完“事件 - 5”后由于从队列中拿不到元素,就会自动阻塞,等待生产者往队列中放入元素,只要队列中有生产者放入元素,就会立即唤醒消费者线程继续获取元素,详见以下Log:

2019-09-02 21:23:58.822 18692-18764/com.chad.blockingqueue D/MainActivity: 生产者 生产事件 = 1
2019-09-02 21:23:58.822 18692-18764/com.chad.blockingqueue D/MainActivity: 生产者 生产事件 = 2
2019-09-02 21:23:58.822 18692-18765/com.chad.blockingqueue D/MainActivity: 消费者 消费事件 = 事件 - 1
2019-09-02 21:23:58.823 18692-18765/com.chad.blockingqueue D/MainActivity: 消费者 消费事件 = 事件 - 2
2019-09-02 21:23:58.823 18692-18764/com.chad.blockingqueue D/MainActivity: 生产者 生产事件 = 3
2019-09-02 21:23:58.823 18692-18765/com.chad.blockingqueue D/MainActivity: 消费者 消费事件 = 事件 - 3
2019-09-02 21:23:58.823 18692-18764/com.chad.blockingqueue D/MainActivity: 生产者 生产事件 = 4
2019-09-02 21:23:58.823 18692-18765/com.chad.blockingqueue D/MainActivity: 消费者 消费事件 = 事件 - 4
2019-09-02 21:23:58.823 18692-18764/com.chad.blockingqueue D/MainActivity: 生产者 生产事件 = 5
2019-09-02 21:23:58.823 18692-18765/com.chad.blockingqueue D/MainActivity: 消费者 消费事件 = 事件 - 5

生产者线程睡2秒,继续生产事件

2019-09-02 21:24:00.825 18692-18764/com.chad.blockingqueue D/MainActivity: 生产者 生产事件 = 6
2019-09-02 21:24:00.825 18692-18765/com.chad.blockingqueue D/MainActivity: 消费者 消费事件 = 事件 - 6
2019-09-02 21:24:00.825 18692-18764/com.chad.blockingqueue D/MainActivity: 生产者 生产事件 = 7
2019-09-02 21:24:00.825 18692-18764/com.chad.blockingqueue D/MainActivity: 生产者 生产事件 = 8
2019-09-02 21:24:00.825 18692-18765/com.chad.blockingqueue D/MainActivity: 消费者 消费事件 = 事件 - 7
2019-09-02 21:24:00.825 18692-18764/com.chad.blockingqueue D/MainActivity: 生产者 生产事件 = 9
2019-09-02 21:24:00.826 18692-18765/com.chad.blockingqueue D/MainActivity: 消费者 消费事件 = 事件 - 8
2019-09-02 21:24:00.826 18692-18764/com.chad.blockingqueue D/MainActivity: 生产者 生产事件 = 10
2019-09-02 21:24:00.826 18692-18764/com.chad.blockingqueue D/MainActivity: 生产者 生产事件 = 11
2019-09-02 21:24:00.826 18692-18765/com.chad.blockingqueue D/MainActivity: 消费者 消费事件 = 事件 - 9
2019-09-02 21:24:00.826 18692-18765/com.chad.blockingqueue D/MainActivity: 消费者 消费事件 = 事件 - 10
2019-09-02 21:24:00.826 18692-18765/com.chad.blockingqueue D/MainActivity: 消费者 消费事件 = 事件 - 11

阻塞队列的原理

下面通过分析ArrayBlockingQueue的原理加深对阻塞队列的理解。

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    /** 存放元素的数组 */
    final Object[] items;

    /** 队首元素下标 */
    int takeIndex;

    /** 队尾元素下标 */
    int putIndex;

    /** 当前队列中存放的元素总数 */
    int count;

    /** 重入锁 */
    final ReentrantLock lock;

    /** 等待获取元素的条件 */
    private final Condition notEmpty;

    /** 等待放入元素的条件 */
    private final Condition notFull;

    /** 构造函数,参数capacity为该队列的容量 */
    public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }

    /** 构造函数,参数capacity为该队列的容量,参数fair为重入锁是公平锁还是非公平锁 */
    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

    /** 构造函数,构造函数,参数capacity为该队列的容量,参数fair为重入锁是公平锁还是非公平锁,参数c为初始队列的元素集合 */
    public ArrayBlockingQueue(int capacity, boolean fair,
                              Collection<? extends E> c) {
        this(capacity, fair);

        final ReentrantLock lock = this.lock;
        // 得到锁
        lock.lock(); // Lock only for visibility, not mutual exclusion
        // 遍历元素集合,初始化元素数组
        try {
            int i = 0;
            try {
                for (E e : c) {
                    checkNotNull(e);
                    items[i++] = e;
                }
            } catch (ArrayIndexOutOfBoundsException ex) {
                throw new IllegalArgumentException();
            }
            count = i;
            putIndex = (i == capacity) ? 0 : i;
        } finally {
            // 释放锁
            lock.unlock();
        }
    }

    /**
     * 放入元素
     * 如果元素数组没有空余位置,不会阻塞消费者端的线程,直接返回false
     * 如果元素数组还有空余位置,调用enqueue()函数,并且返回true
     */
    public boolean offer(E e) {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        // 得到锁
        lock.lock();
        try {
            if (count == items.length)
                return false;
            else {
                enqueue(e);
                return true;
            }
        } finally {
            // 释放锁
            lock.unlock();
        }
    }

    /**
     * 放入元素
     * 如果元素数组没有空余位置,调用notFull.awaitNanos(nanos),会使生产者端的线程进入阻塞状态等待一段时间,
     * 当等待超时后,如果元素数组依然没有空余位置,直接返回false
     * 如果元素数组还有空余位置,调用enqueue()函数,并且返回true
     */
    public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {

        checkNotNull(e);
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        // 得到可中断的锁
        lock.lockInterruptibly();
        try {
            while (count == items.length) {
                if (nanos <= 0)
                    return false;
                nanos = notFull.awaitNanos(nanos);
            }
            enqueue(e);
            return true;
        } finally {
            // 释放锁
            lock.unlock();
        }
    }

    /**
     * 放入元素
     * 如果元素数组没有空余位置,调用notFull.await()使生产者端的线程进入阻塞状态,
     * 直到有消费者从队列中获取元素并且会唤醒生产者端进入阻塞状态的线程继续执行
     * 如果元素数组还有空余位置,调用enqueue()函数
     */
    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        // 得到可中断的锁
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                // 使生产者端的线程进入阻塞状态
                notFull.await();
            enqueue(e);
        } finally {
            // 释放锁
            lock.unlock();
        }
    }

    /**
     * 放入元素(核心函数)
     */
    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        // 唤醒消费端因队列没有元素获取而进入阻塞状态的线程继续执行
        notEmpty.signal();
    }

    /**
     * 获取元素
     * 如果元素数组中没有元素,则直接返回null,否则调用dequeue()返回队首元素
     */
    public E poll() {
        final ReentrantLock lock = this.lock;
        // 得到锁
        lock.lock();
        try {
            return (count == 0) ? null : dequeue();
        } finally {
            // 释放锁
            lock.unlock();
        }
    }

    /**
     * 获取元素
     * 如果元素数组中没有元素,则调用notEmpty.awaitNanos(nanos)使消费者端线程进入阻塞状态,
     * 直到有生产者往队列中放入元素并且会唤醒消费者端进入阻塞状态的线程继续执行
     * 如果元素数组中还有元素,调用dequeue()函数返回队首元素
     */
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        // 得到可中断的锁
        lock.lockInterruptibly();
        try {
            while (count == 0) {
                if (nanos <= 0)
                    return null;
                nanos = notEmpty.awaitNanos(nanos);
            }
            return dequeue();
        } finally {
            // 释放锁
            lock.unlock();
        }
    }
    
    /**
     * 获取元素
     * 如果元素数组没有元素,调用notEmpty.await()使消费者端的线程进入阻塞状态,
     * 直到有生产者往队列中放入元素并且会唤醒消费者端进入阻塞状态的线程继续执行
     * 如果元素数组还有元素,调用enqueue()函数返回队首元素
     */
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        // 得到可中断的锁
        lock.lockInterruptibly();
        try {
            while (count == 0)
                // 使消费者端的线程进入阻塞状态
                notEmpty.await();
            return dequeue();
        } finally {
            // 释放锁
            lock.unlock();
        }
    }

    /**
     * 获取元素(核心函数)
     */
    private E dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        // 唤醒生产者端因队列元素填满而进入阻塞状态的线程继续执行
        notFull.signal();
        return x;
    }
}

总结

在生产者消费模型中,生产数据和消费数据的速率不一致,如果生产数据速度快一些,消费不过来,就会导致数据丢失,这时候我们就可以使用阻塞队列来解决这个问题。

阻塞队列是一个队列,我们使用单线程生产数据,使用多线程消费数据。由于阻塞队列的特点:队列为空的时候消费者端阻塞,队列满的时候生产者端阻塞。多线程消费数据起到了加速消费的作用,使得生产的数据不会在队列里积压过多,而生产的数据也不会丢失处理。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,864评论 6 494
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,175评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,401评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,170评论 1 286
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,276评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,364评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,401评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,179评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,604评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,902评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,070评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,751评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,380评论 3 319
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,077评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,312评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,924评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,957评论 2 351

推荐阅读更多精彩内容