(逐行注释)带你用4种方法精通生产者——消费者模式

背景

生产者和消费者问题是线程模型中的经典问题:生产者和消费者在同一时间段内共用同一个存储空间,生产者往存储空间中添加产品,消费者从存储空间中取走产品,当存储空间为空时,消费者阻塞,当存储空间满时,生产者阻塞。


image.png

解决方案

第一种解决方案 使用 synchronized 的 wait() 和 notify() 来实现

关键的思路就是通过 生产者 和 消费者 的不断循环来不断运行。

通过 full 来表示缓冲区的大小,当然此题中没有往缓冲区里面放的过程,可以自行替换成list等,因为已经有了 synchronized(LOCK) 来保护,所以不需要线程安全的集合类。

count用来表示缓冲区中的现有的项目已经生产到了哪里。

没有体现但很重要的是,wait/notify方法的调用必须处在该对象的锁(Monitor)中,也即,在调用这些方法时首先需要获得该对象的锁。否则会抛出IllegalMonitorStateException异常。

这里需要注意的是sleep()不能放在synchronized代码块里面,因为我们知道sleep()执行之后是不会释放锁的,也就是说当前线程仍然持有对container对象的互斥锁,这个时候当前线程继续判断list.size是否等于capacity,不等于就继续put,然后又sleep一会,然后又继续,直到当list.size == capacity,这个时候终于进入wait()方法,我们知道wait()方法会释放锁,这个时候其他线程才有机会获取到container的互斥锁,

notifyAll()不能单独放在producer类里面,因为notifyAll()必须放在同步代码块里面
弊端:这里由于不能区分哪些是not empty或者not full或者is full/empty线程,所以需要唤醒所有其他等待的线程,但实际上我们需要的是唤醒那些not empty或者not full的线程就够了

wait/notify 的机制


import java.rmi.server.ExportException;
import java.util.Arrays;
import java.util.Comparator;
import java.util.concurrent.Callable;

public class LeetCode215 {

    private static Integer count =0;
    private static final int full =10;
    private static final String LOCK="lock";


    public static void main(String[] args){

        System.out.println("test");
        LeetCode215 test1 = new LeetCode215();
        new Thread(test1.new Producer()).start();
        new Thread(test1.new Consumer()).start();
        new Thread(test1.new Producer()).start();
        new Thread(test1.new Consumer()).start();
        new Thread(test1.new Producer()).start();
        new Thread(test1.new Consumer()).start();
        new Thread(test1.new Producer()).start();
        new Thread(test1.new Consumer()).start();
        new Thread(test1.new producer()).start();
    }

    //生产者
    class Producer implements  Runnable{

        @Override
        public void run() {
            //进行不断的循环,来保证一直生产
            while(true){
                //slepp一下线程,保证循环不要执行的太快,浪费性能
                //sleep不能放在同步代码块里面,因为sleep不会释放锁,
                //当前线程会一直占有produce线程,直到达到容量,调用wait()方法主动释放锁
                try{
                    Thread.sleep(1000);
                }catch(Exception e){
                    e.printStackTrace();
                }

                //加锁到本方法上面
                synchronized (LOCK){
                    //缓冲区已经满了,调用wait等待
                    while(count==full) {
                        try {
                            LOCK.wait();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                    //没有满,可以继续生产
                    count++;
                    System.out.println("生产者produce了:"+Thread.currentThread().getName()+" 一共有:"+count);

                    //唤醒其他都处于wait()的线程,包括生产者和消费者
                    LOCK.notifyAll();


                }

            }


        }
    }


    class Consumer implements Runnable{

        public void run(){

            //不断的执行循环,来进行消费数据
            while(true){

                //消费时间的一个间隔,避免大量的时间用于执行循环
                try{
                    Thread.sleep(1000);
                }catch (Exception e){
                    e.printStackTrace();
                }

                //利用锁来控制 消费者 和 生产者的访问
                synchronized (LOCK){
                    //如果缓冲区里面没有数据
                    while(count==0){
                        try{
                            LOCK.wait();
                        }catch (Exception e){
                            e.printStackTrace();
                        }
                    }

                    //缓冲区有资源了
                    count--;
                    System.out.println("消费者consume了:"+Thread.currentThread().getName()+" 一共有:"+count);
                    LOCK.notifyAll();
                }

            }


        }

    }


}

第二种解决方案 利用重入锁 ReentrantLock()的 await() 和 signalAll() 机制

深入理解条件变量 Condition

ReentrantLock 实现原理

这种方式和 synchroized 方式是基本上一样的

参照 Object 的 wait() 和 notify/notifyAll() 方法,
Condition 也提供了同样的 await() 和 signal/signalAll() 方法。

Condition的await()/signal()方法和Object的wait()/notify()方法

方法ConditionObject阻塞等待await()wait()唤醒其他线程signal()notify()/notifyall()使用的锁互斥锁/共享锁,如Lock同步锁:如synchronized一个锁对应可以创建多个condition对应一个Object唤醒指定的线程明确的指定线程只能通过notifyAll唤醒所有线程;或者notify()随机唤醒

lock和condition实现生产者消费者

该实现方式相比较synchronized于object的wait()/notify()方法具有更加的灵活性,可以唤醒具体的消费者线程或者生产者线程,达到当缓冲区满的时候,唤醒消费者线程,此时生产者线程都将被阻塞,而不是向notifyall()那样唤醒所有的线程。

import java.rmi.server.ExportException;
import java.util.Arrays;
import java.util.Comparator;
import java.util.concurrent.Callable;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class LeetCode215 {

    //使用 ReentrantLock变量来进行
    private final Lock lock = new ReentrantLock();
    //条件变量 通知生产者
    private final Condition isFull = lock.newCondition();
    //条件变量 通知消费者的
    private final Condition isEmpty = lock.newCondition();

    private static int count =0;
    private static final  int num = 10;

    public static void main(String[] args){
        System.out.println("test");
        LeetCode215 test1 = new LeetCode215();
        new Thread(test1.new Producer()).start();
        new Thread(test1.new Consumer()).start();
        new Thread(test1.new Producer()).start();
        
    }

    //生产者
    class Producer implements  Runnable{

        @Override
        public void run() {
            //进行一个循环
            while(true){
                //消费时间的一个间隔,避免大量的时间用于执行循环
                try{
                    Thread.sleep(1000);
                }catch (Exception e){
                    e.printStackTrace();
                }

                //进行加锁
                lock.lock();

                try {
                    //如果缓冲区满了
                    while (count == num) {
                        try {
                            //刮起生产者的线程,暂时不能生产了。
                            isFull.await();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                    //
                    count++;
                    System.out.println("生产者:"+Thread.currentThread().getName()+"产生数据,缓冲区的数量为:"+count);
                    //就可以通知消费者,可以开始消费了
                    isEmpty.signalAll();
                }catch (Exception  e){
                    e.printStackTrace();
                }finally {
                    lock.unlock();
                }
            }


        }

    }


    class Consumer implements Runnable{

        public void run(){

            //不断的执行循环,来进行消费数据
            while(true){

                //消费时间的一个间隔,避免大量的时间用于执行循环
                try{
                    Thread.sleep(1000);
                }catch (Exception e){
                    e.printStackTrace();
                }

                //加锁
                lock.lock();
                try{
                    //如果缓冲区是空的
                    while(count==0){
                        try{
                            //消费者要开始等待了
                            isEmpty.await();
                        }catch (Exception e){
                            e.printStackTrace();
                        }
                    }
                    //开始消费
                    count--;
                    System.out.println("消费者:"+Thread.currentThread().getName()+"消费数据,缓冲区的数量为:"+count);
                    //通知所有的生产者线程开始生产数据
                    isFull.signalAll();
                }catch (Exception e){
                    e.printStackTrace();
                }finally {
                    lock.unlock();
                }

            }


        }

    }


}

第三种方法 使用BlockingQueue 实现生产者-消费者

BlockingQueue的原理

BlockingQueue即阻塞队列,从阻塞这个词可以看出,在某些情况下对阻塞队列的访问可能会造成阻塞。被阻塞的情况主要有如下两种:

  1. 当队列满了的时候进行入队列操作
  2. 当队列空了的时候进行出队列操作
    因此,当一个线程对已经满了的阻塞队列进行入队操作时会阻塞,除非有另外一个线程进行了出队操作,当一个线程对一个空的阻塞队列进行出队操作时也会阻塞,除非有另外一个线程进行了入队操作。
    从上可知,阻塞队列是线程安全的。
    下面是BlockingQueue接口的一些方法:


    image.png

这四类方法分别对应的是:

  1. ThrowsException:如果操作不能马上进行,则抛出异常
  2. SpecialValue:如果操作不能马上进行,将会返回一个特殊的值,一般是true或者false
  3. Blocks:如果操作不能马上进行,操作会被阻塞
  4. TimesOut:如果操作不能马上进行,操作会被阻塞指定的时间,如果指定时间没执行,则返回一个特殊值,一般是true或者false

下面来看由阻塞队列实现的生产者消费者模型,这里我们使用take()和put()方法,这里生产者和生产者,消费者和消费者之间不存在同步,所以会出现连续生成和连续消费的现象

ps:下面的代码是生产者和消费者之间的关系是没有问题的,但是count是不对的,因为blockingqueue的put和take操作是线程安全的,后面的num++ 和 num-- 不一定线程安全。

import java.rmi.server.ExportException;
import java.util.Arrays;
import java.util.Comparator;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class LeetCode215 {

    private static BlockingQueue queue = new ArrayBlockingQueue<>(10);
    private static int count  = 0;

    public static void main(String[] args){
        LeetCode215 test1 = new LeetCode215();
        new Thread(test1.new Producer()).start();
        new Thread(test1.new Consumer()).start();
        new Thread(test1.new Producer()).start();

    }

    //生产者
    class Producer implements  Runnable{

        @Override
        public void run() {
            //进行一个循环
            while(true){
                //消费时间的一个间隔,避免大量的时间用于执行循环
                try{
                    Thread.sleep(1000);
                }catch (Exception e){
                    e.printStackTrace();
                }

                try {
                    // 往队列里面放入元素,put方法的好处就是如果这个时候blockingqueue已经满了,那么这个线程就会自动阻塞,直到有空闲
                    queue.put(1);
                    //缓冲区数量的标示位
                    count ++;
                    System.out.println("生产者:"+Thread.currentThread().getName()+"生产了数据,缓冲区的数量为:"+count);
                }catch (Exception  e){
                    e.printStackTrace();
                }
            }


        }

    }


    class Consumer implements Runnable{

        public void run(){

            //不断的执行循环,来进行消费数据
            while(true){

                //消费时间的一个间隔,避免大量的时间用于执行循环
                try{
                    Thread.sleep(1000);
                }catch (Exception e){
                    e.printStackTrace();
                }


                try{
                    //从blockingqueue里面拿元素,同样如果队列是空,就阻塞了
                    queue.take();
                    //缓冲区的数量剪1
                    count --;
                    System.out.println("消费者:"+Thread.currentThread().getName()+"消费了数据,缓冲区的数量为:"+count);
                }catch (Exception e){
                    e.printStackTrace();
                }

            }


        }

    }


}

第四种方法 使用Semaphore(信号量) 实现

Java并发工具类(信号量Semaphore)

多线程之Semaphore

关键是理解信号量这个操作:

isFull信号量 初始值为10 表示还可以生产10个,生产一个就 acquire 减1,同时 isEmpty 执行 release 加1。

isEmpty信号量 表示还可以消费多少个,初始值为0,表示没有可消费的,每消费一个,就要先 acquire 减 1,同时 isFull 执行 release 加1。

import java.rmi.server.ExportException;
import java.util.Arrays;
import java.util.Comparator;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.Semaphore;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class LeetCode215 {

    private static int count  = 0;
    //isFull信号量  10 表示缓冲区的数量为10,表示还可以生产多少个
    static Semaphore isFull = new Semaphore(10);
    //isEmpty信号量 表示还可以消费多少个
    static Semaphore isEmpty = new Semaphore(0);
    //互斥锁
    static Semaphore isUse = new Semaphore(1);


    public static void main(String[] args){
        LeetCode215 test1 = new LeetCode215();
        new Thread(test1.new Producer()).start();
        new Thread(test1.new Consumer()).start();
        new Thread(test1.new Producer()).start();

    }

    //生产者
    class Producer implements  Runnable{

        @Override
        public void run() {
            //进行一个循环
            while(true){
                //消费时间的一个间隔,避免大量的时间用于执行循环
                try{
                    Thread.sleep(1000);
                }catch (Exception e){
                    e.printStackTrace();
                }

                try {
                    //首先拿到 isFull 信号量,表示
                    isFull.acquire();
                    //拿到控制生产者 和 消费者 的互斥信号量
                    isUse.acquire();
                    //缓冲区数量的标示位
                    count ++;
                    System.out.println("生产者:"+Thread.currentThread().getName()+"生产了数据,缓冲区的数量为:"+count);
                }catch (Exception  e){
                    e.printStackTrace();
                }finally {
                    //释放控制生产者 和 消费者 的互斥信号量
                    isUse.release();
                    //isEmpty的信号量加1,表示缓冲区有数据,可以在消费一个
                    isEmpty.release();
                }
            }


        }

    }


    class Consumer implements Runnable{

        public void run(){

            //不断的执行循环,来进行消费数据
            while(true){

                //消费时间的一个间隔,避免大量的时间用于执行循环
                try{
                    Thread.sleep(1000);
                }catch (Exception e){
                    e.printStackTrace();
                }


                try{
                    //默认是0,缓冲区里面没数据,要消费数据需要先申请
                    isEmpty.acquire();
                    //拿到控制生产者 和 消费者 的互斥信号量
                    isUse.acquire();
                    //缓冲区的数量减1
                    count --;
                    System.out.println("消费者:"+Thread.currentThread().getName()+"消费了数据,缓冲区的数量为:"+count);
                }catch (Exception e){
                    e.printStackTrace();
                }finally {
                    //释放控制生产者 和 消费者 的互斥信号量
                    isUse.release();
                    //isFull 加1,表示可以再生产1个
                    isFull.release();
                }

            }


        }

    }


}

参考资料

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,284评论 6 506
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,115评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,614评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,671评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,699评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,562评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,309评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,223评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,668评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,859评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,981评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,705评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,310评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,904评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,023评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,146评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,933评论 2 355