[Java并发编程实战] 阻塞队列 BlockingQueue(含代码,生产者-消费者模型)

见贤思齐焉,见不贤而内自省也。—《论语》

Java5.0 增加了两种新的容器类型,它们是指:Queue 和 BlockingQueue。Queue 用来临时保存一组等待处理的元素。BlockingQueue 扩张了 Queue 接口,增加了可阻塞的插入和获取等操作。

BlockingQueue 通常运用于一个线程生产对象放入队列,另一个线程从队列获取对象并消费,这是典型的生产者消费者模型。


这里写图片描述

生产者线程持续生产新对象并插入队列,如果队列已满,那么插入对象的操作会一直阻塞,直到队列中出现可用的空间。
消费者线程持续从队列获取对象,如果队列为空,那么获取操作会一直阻塞,直到队列中出现可用的新对象。BlockingQueue 简化了生产者-消费者设计的实现过程,它支持任意数量的生产者和消费者。

BlockingQueue 的核心方法:

这里写图片描述

offer(E): 向队列插入元素,并返回插入成功与否。本方法不阻塞当前执行线程。

put(E) : 向队列插入元素,如果队列已满,则会阻塞当前线程直至元素加入队列。

take() : 获取队列的首位元素,如果队列为空,则阻塞当前线程直至队列有元素并取走。

poll():获取队列首个元素,指定时间内一旦数据可取,则立即返回;否则返回失败。

remove(E):删除队列中的元素,返回成功与否。

BlockingQueue 的实现

BlockingQueue是一个接口,所以你必须使用它的实现来使用它。它的实现包括以下几个:

  • ArrayBlockingQueue:基于数组实现的有界队列(FIFO),使用一把全局锁并行对 queue 读写操作,同时使用两个 Condition 阻塞队列为空时的取操作和队列为满时的写操作。

  • LinkedBlockingQueue:基于已链接节点的,范围上限为 Integer.MAX_VALUE 的 blocking queue(FIFO)。主要操作 put 和 take 都是阻塞的。

  • DelayQueue:当指定的延迟时间到了,才能够从队列中获取元素。它没有大小限制,因此插入元素时不会阻塞,而只有获取元素时才会被阻塞。它的用法可以参考下面两篇博客:
    http://www.cnblogs.com/jobs/archive/2007/04/27/730255.html
    http://www.cnblogs.com/sunzhenchao/p/3515085.html

  • PriorityBlockingQueue: 基于优先级的阻塞队列,但需要注意的是PriorityBlockingQueue并不会阻塞数据生产者,而只会在没有可消费的数据时,阻塞数据的消费者。

  • SynchronoutQueue:它的内部同时只能够容纳单个元素。如果该队列已有一个元素的话,试图向队列插入一个新元素线程会阻塞,知道另一个线程将该元素从队列中拿走。同样,如果该队列为空,试图向队列中抽取一个元素的线程将会阻塞,知道另一个线程向队列中插入一个新的元素。SynchronousQueue适合一对一的匹配场景,没有容量,无法缓存。它的用法强烈推荐博客:
    http://www.cnblogs.com/leesf456/p/5560362.html

BlockingQueue的使用

这是一个使用 BlockingQueue 的例子,本例使用 ArrayBlockingQueue 实现。首先,BlockingQueueTest 创建一个生产者线程 Procucer, 把字符存放进共享队列。然后创建三个消费者线程 Consumer,把字符串从队列中取出。Consumer 取到最后一个字符串时,中断所有消费者线程,结束程序。

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Vector;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class BlockingQueueTest {
    //队列容量
    private static final int SIZE = 5;
    private static final int CONSUMER_SIZE = 3;
    //消费者线程退出标志
    private static String endString = "num:" + (SIZE*2-1);
    //存放消费者线程
    private static List list = new ArrayList<Thread>();
    
    public static void main(String[] args) throws Exception{
        //创建固定长度的阻塞队列
        BlockingQueue q = new ArrayBlockingQueue<String>(SIZE);
        
        //创建生产者
        Producer producer = new Producer(q);
        //启动生产者线程,生产对象
        producer.start();
        
        //启动消费者线程,获取队列对象
        for(int i = 0; i < CONSUMER_SIZE; i++) {
            list.add(new Consumer(q));
            ((Thread) list.get(i)).start();
        }
    }
    
    //中断线程
    public static void shutDownThread() {
        for(int i = 0; i < CONSUMER_SIZE; i++) {
            ((Thread) list.get(i)).interrupt();
        }
    }

    static class Producer extends Thread{

        private BlockingQueue queue = null;
        
        public Producer(BlockingQueue q) {
            this.queue = q;
        }
        
        @Override
        public void run() {
            // TODO Auto-generated method stub
            try {
                //生产10个对象,放进队列
                for(int i = 0; i < SIZE*2; i++) {
                    String str = "num:" + i;
                    System.out.println(Thread.currentThread().getName() +":"+"IN: " + str);
                    queue.put(str);
                    Thread.sleep(100);
                }
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            
        }
    }
    
    //消费者线程
    static class Consumer extends Thread{

        private BlockingQueue queue = null;
        
        public Consumer(BlockingQueue q) {
            this.queue = q;
        }
        
        @Override
        public void run() {
            // TODO Auto-generated method stub
            try {
                //获取队列的元素,队列为空时会阻塞
                while(true) {
                    String str = (String) queue.take();
                    System.out.println(Thread.currentThread().getName() + ":" + "OUT " + str);
                    //已经取出最后一个元素,消费者线程应该退出,否则程序一直在运行
                    if(str.equals(endString)) {
                        shutDownThread();//中断线程
                    }
                }
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }
}


执行结果:

这里写图片描述

总结

java.util.concurrent 中实现的阻塞队列包含了足够的同步机制,从而能够安全的将对象从生产者线程发布到消费者线程。对于可变对象,生产者-消费者模型,把对象的所有权安全的从生产者交付给消费者。转移后,消费者线程获得这个对象的所有权(独占访问权,可以任意修改它),并且生产者线程不会再访问它。同时,阻塞队列负责所有的控制流,使得消费者生产者的代码更加简单和清晰。

本文完毕,如对你有帮助,请关注我,谢谢~

参考

  1. https://blog.csdn.net/defonds/article/details/44021605/
  2. http://www.cnblogs.com/leesf456/p/5428630.html
  3. 《并发编程实战》
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,324评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,303评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,192评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,555评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,569评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,566评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,927评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,583评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,827评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,590评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,669评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,365评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,941评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,928评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,159评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,880评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,399评论 2 342

推荐阅读更多精彩内容