多线程系列(三)阻塞队列

1 多生产多消费中的缓冲区

什么是多生产多消费中的缓冲区呢?举个例子,上篇文章我们将多生产多消费的时候,只有生产了一辆车才能去消费,这样是不是存在一些弊端?客户X说:我想看一下你们的电动车,销售员:现在不能看,正在生产中!! 客户X:那我买个毛线啊,看都看不了。(゜-゜)

为了解决上面这个问题,我们是不是可以先生产出一批车辆放在车库,等销售员卖出车辆的时候直接从车库取。没错这样就完美解决了上述问题,在Java中我们要引入缓冲区的概念来实现,该缓冲区就相当于车库。

下面我用缓冲区来实现一个多生产多消费的例子
资源类

//资源类
class BoundedDemo{
    private Lock lock = new ReentrantLock();
    //生产者监视器
    private Condition notFull = lock.newCondition();
    //消费者监视器
    private Condition notEmpty = lock.newCondition();
    
    //缓冲区
    private Object[] objs = new Object[100];
    
    private int putptr = 0;//生产者指针
    private int takeptr = 0;//消费者指针
    private int count = 0;//缓冲区大小
    public void put(Object obj) throws InterruptedException{
        lock.lock();
        try{
            //如果缓冲区存满了,将生产者等待
            while(count == objs.length){
                notFull.await();
                
            }
            //将生产的内容放在缓冲区objs中,putptr为存入的角标
            objs[putptr] = obj;
            //当角标取代最后一个时,置为0从头开始再取
            if(++putptr==objs.length){
                putptr = 0;
            }
            //每生产一个count+1
            ++count;
            //唤醒消费者
            notEmpty.signal();
        }finally{
            //中途发生异常必须释放锁
            lock.unlock();
        }
        
    }
    
    public Object take()throws InterruptedException{
        lock.lock();
        try{
            //如果缓冲区没有内容,将消费者等待
            while(count==0){
                notEmpty.await();
            }
            Object obj = objs[takeptr];
            if(++takeptr==objs.length){
                takeptr=0;
            }
            --count;
            notFull.signal();
            return obj;
        }finally{
            //中途发生异常强制释放锁
            lock.unlock();
        }
    }
}

定义一个缓冲区,长度为100,生产者生产的内容往缓冲区里面放,如果缓冲区达到最大长度,则将生产者处于等待状态。消费者从缓冲区中取内容,如果如果缓冲区里没数据则将消费者处于等待状态。

生产者任务

class InputRunnable implements Runnable{

    private BoundedDemo demo;
    private int content = 0;//数据
    public InputRunnable(BoundedDemo demo){
        this.demo = demo;
    }
    
    public void run() {
        // TODO Auto-generated method stub
        
        while(true){
            try {
                demo.put(content++);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }
    
}

消费者任务

class OutputRunnable implements Runnable{

    private BoundedDemo demo;
    private int content = 0;//数据
    public OutputRunnable(BoundedDemo demo){
        this.demo = demo;
    }
    
    public void run() {
        // TODO Auto-generated method stub
        
        while(true){
            try {
                System.out.println("消费---"+demo.take());
                Thread.sleep(100);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }
    
}

我们用四个线程执行任务看一下打印结果

public class BufferDemo {

    /**
     * @param args
     */
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        BoundedDemo demo = new BoundedDemo();
        InputRunnable input = new InputRunnable(demo);
        OutputRunnable output = new OutputRunnable(demo);
        Thread t0 = new Thread(input);//生产者线程
        Thread t2 = new Thread(output);//消费者线程
        t0.start();
        t2.start();
    }

}

打印结果

消费---94
消费---95
消费---96
消费---97
消费---98
消费---99
消费---100
消费---101

完美运行,不存在安全问题,而且效率也比之前要高很多。

2 阻塞队列

2.1阻塞队列概念

什么是阻塞队列呢?

  • 支持队列的基本插入功能,如果长度超出了限定长度,则进行等待
  • 支持队列的基本取出功能,如果长度为0,则进行等待

什么意思呢?别急,这时候就用到了上一小节讲的内容,其实上一小节中叙述的缓冲区就是一个数组类型的阻塞队列。“缓冲区就缓冲区呗,你还阻塞队列,说的这么抽象,是为了装X吗?”,大兄弟继续坐下啦喝杯凉茶消消火(゜-゜)。

既然我提到了阻塞队列,说明java中有这么一个概念,有说明java会给我们提供相关API。“你什么意思?”,大兄弟先别激动,就是这么惊喜,以后我们写多生产多消费的时候不用我们霹雳啪啦写上面这么一堆,java给我们定义有阻塞队列,我们直接拿来用就行了。“卧槽,那你前面那么多废话干嘛?糊弄我们呢?",哈哈,学东西我们要知其然更要知其所以然,不是吗,召唤师(゜-゜)?

2.2 BlokingQueue

BlokingQueue是一个接口,它提供了阻塞式队列的一些重要的方法,比如:

  • put(E e)将对象加入到队列中,如果队列没有空间,则会被阻塞直到队列腾出空间,跟小结1中的put(obj)相同
  • offer(E e) 将对象e加入到对队列中,该方法不是阻塞式的,加入成功就返回true反之返回false。
  • offer(E e,lomg time,TimeUnit unit) 更offer(E e)方法作用基本一样,只是增加了一个时间限制,如果没能成功加入队列会等待time时间,等待时间过后扔未能加入返回false,unit为时间单位。
  • poll() 取出队列中首部元素,为非阻塞式方法,如果取不到则返回null。
  • poll(long time,TimeUnit unit) 取出队列中首部元素,如果取不到可以等待time时间,超过time时间仍然取不到返回null。
  • take() 取出队列中首部元素,如果取不到会被一直阻塞直到能取到为止,和小结1中take()相同。

Java为我们开发者提供了七中BlockingQueue的实现类,分别是:ArrayBlockingQueue、LinkedBlockingQueue。PriorityBlockingQueue、DelayQueue、SynchronousQueue、LinkedTransferQueue、LinkedBlockingDeque。

2.3 ArrayBlockingQueue

ArrayBlockingQueue是内部为数组数据结构的阻塞队列,其实我们在小结1中就是ArrayBlockingQueue的部分源码。

ArrayBlockingQueue的使用如下:
生产者

//生产者
class InputRunnable implements Runnable{

    private BlockingQueue<Integer> queue;
    private int content = 0;//数据
    public InputRunnable(BlockingQueue<Integer> queue){
        this.queue = queue;
    }
    
    public void run() {
        // TODO Auto-generated method stub
        while(true){
            try {
                queue.put(content++);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }
    
}

消费者

class OutputRunnable implements Runnable{

    private BlockingQueue<Integer> queue;
    public OutputRunnable(BlockingQueue<Integer> queue){
        this.queue = queue;
    }
    
    public void run() {
        // TODO Auto-generated method stub
        
        while(true){
            try {
                System.out.println("消费---"+queue.take());
                Thread.sleep(100);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }   
}
public static void main(String[] args) {
        // TODO Auto-generated method stub
        BlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(100,true);
        InputRunnable input = new InputRunnable(queue);
        OutputRunnable output = new OutputRunnable(queue);
        Thread t0 = new Thread(input);//生产者线程
        Thread t2 = new Thread(output);//消费者线程
        t0.start();
        t2.start();
    }

打印结果

消费---97
消费---98
消费---99
消费---100
消费---101
消费---102
消费---103
消费---104

使用过程中需要注意一点:

  • ArrayBlockingQueue是有界的,必须要明确长度,即够构造函数中第一个参数

通过阻塞队列可以完美避开多生产多消费中安全问题,而且用起来也特别简单。

2.4 LinkedBlockingQueue

LinkedBlockingQueue内部的队列是通过链表数据结构实现的,同样遵循先进先出的原则,使用方法与ArrayBlockingQueue类似,所以就不一一演示了。

注意点

LinkedBlockingQueue内部默认是维护了一个长度为Integer.MAX_VALUE的链表,也就是无限大,此时会出现一个问题,一旦生产速度远大于消费速度会使内部队列的长度快速增长,如不进行控制很容易触发OutOfMemory,所以在使用的时候尽量也定义一个长度,直接在构造函数中传入即可。

2.5 PriorityBlockingQueue

PriorityBlockingQueue是一个无边界优先级队列,默认是按照java规则升序排列,但同时也可以定义Comparator重写compareTo()方法自行定义排序,最后将Comparator对象传入即可。

2.6 DelayQueue

DelayQueue是一个可以延时取出元素的无界队列,内部由一个PriorityQueue进行维护,存储元素时需要实现Delayed接口,指定元素有效期,只有在有效期内才能被取走。

2.7 SynchronousQueue

SynchronousQueue内部是不进行存储元素的(也可以理解为队列长度为1),也就是说当元素被take之后才能进行put,否则会进入阻塞状态,这个跟我们上一节讲的生产者消费者类似,只能对元素一个一个进行操作。

2.8 LinkedTransferQueue

LinkedTransferQueue内部是一个链表数据结构的无界队列,实现了TransferQueue接口,所以具备了几个特性

  • transfer(E e) 如果存在一个正在取出元素的消费者线程,则立刻将元素传递给消费者,如果不存在会将元素插入到队列尾部并进入阻塞状态,直到消费者取走该元素。
  • tryTransfer(E e) 与transfer类似,区别是:如果存在一个正在取出元素的消费者线程,则立刻将元素传递给消费者,如果不存在不会将元素加入到队列并返回false,该方法不是阻塞式方法。

2.9 LinkedBlockingDeque

LinkedBlockingDeque内部是一个双向链表数据结构的阻塞队列,因为是双向的链表,所以具备了头部尾部的增删操作。

2.10 小结

七种阻塞队列使用方式大致相同,其中ArrayBlockingQueue和LinkedBlockingQueue较为常用,如果有数据结构基础的同学建议对每个阻塞队列的源码进行一次通读。

总结

本篇文章的内容相对来说还是比较简单的,七种阻塞队列的差异基本都体现在内部所维护的数据结构,再次重申一遍,如果对数据结构有了解的同学建议读一遍几种阻塞队列的源码,如果对数据结构还不大了解,没关系,我下一个系列写Android网络编程,下下一个系列就写Java中常见的几种数据结构。其实这篇文章是为下一篇文章做铺垫的,下篇文章我给大家带来多线程系列(四)线程池。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,463评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,868评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,213评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,666评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,759评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,725评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,716评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,484评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,928评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,233评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,393评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,073评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,718评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,308评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,538评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,338评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,260评论 2 352

推荐阅读更多精彩内容