工作三年,小胖问我:什么是生产者消费者模式?菜到抠脚!

生产者消费者模式在我们日常工作中用得非常多,比如:在模块解耦、消息队列、分布式场景中都很常见。这个模式里有三个角色,他们之间的关系是如下图这样的:

图源:Java 并发编程 - 徐隆曦
  • 生产者线程:生产消息、数据
  • 消费者线程:消费消息、数据
  • 阻塞队列:作数据缓冲、平衡二者能力,避免出现"产能过剩"的情况(生产者生产速度远高于消费者消费速度 or 多个生产者对一个消费者)以及"供不应求"的情况(生产者生产速度远低于消费者消费速度 or 多个消费者对一个生产者)

从图中 3 和 4 可以知道:无论阻塞队列是满还是空都可能会产生阻塞,阻塞之后就要在合适的时候去唤醒被阻塞的线程。

Q1:那什么时候会唤醒阻塞线程?

  • 1、当消费者判断队列为空时,消费者线程进入等待。这期间生产者一旦往队列中放入数据,就会通知所有的消费者,唤醒阻塞的消费者线程。

  • 2、反之,当生产者判断队列已满,生产者线程进入等待。这期间消费者一旦消费了数据、队列有空位,就会通知所有的生产者,唤醒阻塞的生产者线程。

Q2:为什么要用这种模式?

看了上面的 Q1,大家发现没有?生产者不用管消费者的动作,消费者也不用管生产者的动作;它两之间就是通过阻塞队列通信,实现了解耦;阻塞队列的加入,平衡二者能力;生产者只有在队列满或消费者只有在队列空时才会等待,其他时间谁抢到锁谁工作,提高效率。以上就是原因~

使用 wait、notify/notifyAll 实现

上篇文章《正确使用 wait、notify/notifyAll》说过,wait 让当前线程等待并释放锁,notify 唤醒任意一个等待同一个锁的线程,notifyAll 则是唤醒所有等待该锁的线程,然后谁抢到锁,谁执行。这就是所谓的等待唤醒机制

先来看看用等待唤醒机制如何实现生产者、消费者模式的,首先是阻塞队列:

public class MyBlockingQueue {

    private int maxSize;
    private LinkedList<Integer> queue;

    public MyBlockingQueue(int size) {
        this.maxSize = size;
        queue = new LinkedList<>();
    }

    public synchronized void put() throws InterruptedException {
        while (queue.size() == maxSize) {
            System.out.println("队列已满,生产者: " + Thread.currentThread().getName() +"进入等待");
            wait();
        }
        Random random = new Random();
        int i = random.nextInt();
        System.out.println("队列未满,生产者: " +
                Thread.currentThread().getName() +"放入数据" + i);

        // 队列空才去唤醒消费者,其他时间自由竞争锁
        if (queue.size() == 0) {
            notifyAll();
        }

        queue.add(i);
    }

    public synchronized void take() throws InterruptedException {
        while (queue.size() == 0) {
            System.out.println("队列为空,消费者: " + Thread.currentThread().getName() +"进入等待");
            wait();
        }

        // 队列满了才去唤醒生产者,其他时间自由竞争锁
        if (queue.size() == maxSize) {
            notifyAll();
        }

        System.out.println("队列有数据,消费者: " +
                Thread.currentThread().getName() +"取出数据: " + queue.remove());
    }

}

主要逻辑在阻塞队列这边:先看 put 方法,while 检查队列是否满?满则进入等待并主动释放锁,不满则生产数据,同时判断放入数据之前队列是否空?空则唤醒消费者(因为队列已有数据,可消费)。

再看 take 方法,while 检查队列是否空?空则进入等待并主动释放锁,不空则生产数据,同时判断取出数据之前队列是否已满?满则唤醒生产者(因为队列已有空位,可生产)。

为什么是 while 不是 if ?

大家可能有个疑问。为什么判断队列 size 进入等待状态这里是用 while,不能用 if 吗?就这个 demo 而言,是可以的。因为我们的生产者和消费者线程都只有一个,但是多线程情况下用 if 就大错特错了。想象以下情况:

  • 假设有两个消费者一个生产者。队列为空,消费者一进入等待状态,释放锁。消费者二抢到锁,进入 if(queue.size == 0) 的判断,也进入等待,释放锁。这时生产者抢到锁生产数据,队列有数据了。反过来唤醒两个消费者。

  • 消费者一抢到锁执行 wait() 后的逻辑,取完数据释放锁。这时消费者二拿到锁,执行 wait() 后的逻辑取数据,但是此时队列的数据已被消费者一取出,没有数据了,这时就会报异常了。

而用 while 为什么可以?因为不管是消费者一还是二抢到锁,循环体的逻辑之前。根据 while 的语法,它会再一次判断条件是否成立,而 if 不会。这就是用 while 不用 if 的原因。

生产者:

public class Producer implements Runnable {

    private MyBlockingQueue myBlockingQueue;

    public Producer(MyBlockingQueue myBlockingQueue) {
        this.myBlockingQueue = myBlockingQueue;
    }

    @Override
    public void run() {
        for (int i = 0; i < 100; i++) {
            try {
                myBlockingQueue.put();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

消费者:

public class Consumer implements Runnable{

    private MyBlockingQueue myBlockingQueue;

    public Consumer(MyBlockingQueue myBlockingQueue) {
        this.myBlockingQueue = myBlockingQueue;
    }

    @Override
    public void run() {
        for (int i = 0; i < 100; i++) {
            try {
                myBlockingQueue.take();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

}

测试类:

public class MyBlockingQueueTest {

    public static void main(String[] args) {
        MyBlockingQueue myBlockingQueue = new MyBlockingQueue(10);
        Producer producer = new Producer(myBlockingQueue);
        Consumer consumer = new Consumer(myBlockingQueue);
        new Thread(producer).start();
        new Thread(consumer).start();
    }

}

使用 Condition 实现

Condition 是一个多线程间协调通信的工具类,它的 await、sign/signAll 方法正好对应Object 的 wait、notify/notifyAll 方法。相比于 Object 的 wait、notify 方法,Condition 的 await、signal 结合的方式实现线程间协作更加安全和高效,所以更推荐这种方式实现线程间协作。关于 Condition 后面章节会继续研究,敬请关注

Object 的 wait、notify 方式需要结合 synchronized 关键字实现等待唤醒机制,同样 Condition 也需要结合 Lock 类-。那么这种方式如何实现生产者、消费者模式?看代码:

public class MyBlockingQueueForCondition {

    private Queue<Integer> queue;
    private int max = 10;
    private ReentrantLock lock = new ReentrantLock();
    private Condition notEmpty = lock.newCondition();
    private Condition notFull = lock.newCondition();

    public MyBlockingQueueForCondition(int size) {
        this.max = size;
        queue = new LinkedList();
    }

    public void put(Integer i) throws InterruptedException {
        // 加锁
        lock.lock();
        try {
            // 队列满了,进入等待
            while (queue.size() == max) {
                System.out.println("队列已满,生产者: " + Thread.currentThread().getName() + "进入等待");
                notFull.await();
            }

            // 加入数据之前,队列为空?通知消费者,可以消费
            if (queue.size() == 0) {
                notEmpty.signalAll();
            }

            // 否则,继续生产
            queue.add(i);
        } finally {
            // 最后别忘记释放锁
            lock.unlock();
        }
    }

    public Integer take() throws InterruptedException {
        // 加锁
        lock.lock();
        try {
            // 队列无数据,进入等待
            while (queue.size() == 0) {
                System.out.println("队列为空,消费者: " + Thread.currentThread().getName() + "进入等待");
                notEmpty.await();
            }

            // 取出数据之前,队列已满?通知生产者,可以生产
            if (queue.size() == max) {
                notFull.signalAll();
            }

            // 否则,取出
            return queue.remove();
        } finally {
            // 最后别忘记释放锁
            lock.unlock();
        }
    }
}

首先,定义了一个队列以及 ReentrantLock 类型的锁,在这基础上还创建 notFull、notEmpty 两个条件,分别代表未满、不为空的条件。最后定义了 take、put 方法。

take 和 put 逻辑差不多,这里只说 put 。因为消费生产模式肯定用于多线程环境,需要保证同步。这里还是先获取锁,确保同步。之后依然是判断队列是否已满?满了进入等待并释放锁,不满则继续生产,同时判断队列在生产前是否为空,为空才去唤醒消费者。否则不唤醒,因为当队列为空消费者才进入阻塞

PS:最后是一个非常重要的细节,在 finally 里面释放锁,否则有可能出现异常无法释放锁的情况

生产者:

public class ProducerForCondition implements Runnable {

    private MyBlockingQueueForCondition myBlockingQueueForCondition;

    public ProducerForCondition(MyBlockingQueueForCondition myBlockingQueueForCondition) {
        this.myBlockingQueueForCondition = myBlockingQueueForCondition;
    }

    @Override
    public void run() {
        for (int i = 0; i < 100; i++) {
            try {
                myBlockingQueueForCondition.put(i);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

消费者:

public class ConsumerForCondition implements Runnable{

    private MyBlockingQueueForCondition myBlockingQueueForCondition;

    public ConsumerForCondition(MyBlockingQueueForCondition myBlockingQueueForCondition) {
        this.myBlockingQueueForCondition = myBlockingQueueForCondition;
    }

    @Override
    public void run() {
        for (int i = 0; i < 100; i++) {
            try {
                System.out.println("消费者取出数据: " + myBlockingQueueForCondition.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

}

测试类:

public class MyBlockingQueueForConditionTest {

    public static void main(String[] args) {
        MyBlockingQueueForCondition myBlockingQueueForCondition = new MyBlockingQueueForCondition(10);
        ProducerForCondition producerForCondition = new ProducerForCondition(myBlockingQueueForCondition);
        ConsumerForCondition consumerForCondition = new ConsumerForCondition(myBlockingQueueForCondition);
        new Thread(producerForCondition).start();
        new Thread(consumerForCondition).start();
    }

}

使用 BlockingQueue 实现

看完前两种方式之后,有些小伙伴可能会说,实现个生产者消费者这么烦么?其实主要代码还是在阻塞队列,这点 Java 早就为我们考虑好了,它提供了 BlockingQueue 接口,并有实现类: ArrayBlockingQueue、DelayQueue、 LinkedBlockingDeque、LinkedBlockingQueue、等。(关于阻塞队列,狗哥的多线程系列后面也会讲到

我们选用最简单的 ArrayBlockingQueue 实现。它的内部也是采取 ReentrantLock 和 Condition 结合的等待唤醒机制。所以,上面的两种方式其实是为这种方式铺垫。不多比比,上代码:

public class ArrayBlockingQueueTest {

    public static void main(String[] args) {
        // 初始化长度为 10 的 ArrayBlockingQueue
        BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
        // 生产者
        Runnable producer = () -> {
            try {
                // 放入数据
                Random random = new Random();
                while (true) {
                    queue.put(random.nextInt());
                }
            } catch (Exception e) {
                System.out.println("生产数据出错: " + e.getMessage());
            }
        };
        // 开启线程生产数据
        new Thread(producer).start();

        // 消费者
        Runnable consumer = () -> {
            try {
                // 取出数据
                while (true) {
                    System.out.println(queue.take());
                }
            } catch (Exception e) {
                System.out.println("消费数据出错: " + e.getMessage());
            }
        };
        // 开启线程消费数据
        new Thread(consumer).start();
    }

}

创建一个 ArrayBlockingQueue 并给定最大长度为 10,创建生产者和消费者。生产者在 while(true) 里面一直生产,与此同时消费者也是不断取数据,有数据就取出来。

看着是不是很简单?但其实背后 ArrayBlockingQueue 已经为我们做好了线程间通信的工作了,比如队列满了就去阻塞生产者线程,队列有空就去唤醒生产者线程等

巨人的肩膀

总结

看了这几个例子之后,相信你对生产者消费者模式也有所了解。以后面试官让你手写一个阻塞队列,肯定也难不倒你。

小福利

如果看到这里,喜欢这篇文章的话,请帮点个好看。微信搜索一个优秀的废人,关注后回复电子书送你 100+ 本编程电子书 ,不只 Java 哦,详情看下图。回复 1024送你一套完整的 java 视频教程。

资源
C语言
C++
Java
Git
Python
GO
Linux
经典必读
面试相关
前端
人工智能
设计模式
数据库
数据结构与算法
计算机基础
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,923评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,154评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,775评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,960评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,976评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,972评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,893评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,709评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,159评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,400评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,552评论 1 346
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,265评论 5 341
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,876评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,528评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,701评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,552评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,451评论 2 352

推荐阅读更多精彩内容