多线程必考的「生产者 - 消费者」模型,看齐姐这篇文章就够了

<span style="display:block;text-align:center;">这里是《壹齐学多线程》系列的第 3 篇

生产者 - 消费者模型 Producer-consumer problem 是一个非常经典的多线程并发协作的模型,在分布式系统里非常常见。也是面试中无论中美大厂都非常爱考的一个问题,对应届生问的要少一些,但是对于有工作经验的工程师来说,非常爱考。

这个问题有非常多的版本和解决方式,在本文我重点是和大家壹齐理清思路,由浅入深的思考问题,保证大家看完了都能有所收获。

问题背景

简单来说,这个模型是由两类线程构成:

  • 生产者线程:“生产”产品,并把产品放到一个队列里;

  • 消费者线程:“消费”产品。

image
  • 队列:数据缓存区。

有了这个队列,生产者就只需要关注生产,而不用管消费者的消费行为,更不用等待消费者线程执行完;消费者也只管消费,不用管生产者是怎么生产的,更不用等着生产者生产。

所以该模型实现了生产者和消费者之间的解藕异步

什么是异步呢?

比如说你和你女朋友打电话,就得等她接了电话你们才能说话,这是同步。

但是如果你跟她发微信,并不需要等她回复,她也不需要立刻回复,而是等她有空了再回,这就是异步。

但是呢,生产者和消费者之间也不能完全没有联系的。

  • 如果队列里的产品已经满了,生产者就不能继续生产;

  • 如果队列里的产品从无到有,生产者就得通知一下消费者,告诉它可以来消费了;

  • 如果队列里已经没有产品了,消费者也无法继续消费;

  • 如果队列里的产品从满到不满,消费者也得去通知下生产者,说你可以来生产了。

所以它们之间还需要有协作,最经典的就是使用 Object 类里自带的 wait()notify() 或者 notifyAll() 的消息通知机制。

上述描述中的等着,其实就是用 wait() 来实现的;

通知,就是 notify() 或者 notifyAll()

那么基于这种消息通知机制,我们还能够平衡生产者和消费者之间的速度差异

如果生产者的生产速度很慢,但是消费者消费的很快,就像是我们每月工资就发两次,但是每天都要花钱,也就是 1:15.

那么我们就需要调整生产者(发工资)为 15 个线程,消费者保持 1 个线程,这样是不是很爽~

总结下该模型的三大优点: 解藕,异步,平衡速度差异。

wait()/notify()

接下来我们需要重点看下这个通知机制。

wait()notify() 都是 Java 中的 Object 类自带的方法,可以用来实现线程间的通信。

上一节讲的 11 个 APIs 里我也提到了它,我们这里再展开讲一下。

wait() 方法是用来让当前线程等待,直到有别的线程调用 notify() 将它唤醒,或者我们可以设定一个时间让它自动苏醒。

调用该方法之前,线程必须要获得该对象的对象监视器锁,也就是只能用在加锁的方法下。

而调用该方法之后,当前线程会释放锁。(提示:这里很重要,也是下文代码中用 while 而非 if 的原因。)

notify() 方法只能通知一个线程,如果多个线程在等待,那就唤醒任意一个。

notifyAll() 方法是可以唤醒所有等待线程,然后加入同步队列。

image

这里我们用到了 2 个队列:

  • 同步队列:对应于我们上一节讲的线程状态中的 Runnable,也就是线程准备就绪,就等着抢资源了。

  • 等待队列:对应于我们上一节讲的线程状态中的 Waiting,也就是等待状态。

这里需要注意,从等待状态线程无法直接进入 Q2,而是要先重新加入同步队列,再次等待拿锁,拿到了锁才能进去 Q2;一旦出了 Q2,锁就丢了。

Q2 里,其实只有一个线程,因为这里我们必须要加锁才能进行操作。

实现

这里我首先建了一个简单的 Product 类,用来表示生产和消费的产品,大家可以自行添加更多的 fields

<pre spellcheck="false" class="md-fences md-end-block ty-contain-cm modeLoaded" lang="java" cid="n873" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--monospace); font-size: 0.9em; display: block; break-inside: avoid; text-align: left; white-space: normal; background-image: inherit; background-size: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248); position: relative !important; border: 1px solid rgb(231, 234, 237); border-top-left-radius: 3px; border-top-right-radius: 3px; border-bottom-right-radius: 3px; border-bottom-left-radius: 3px; padding: 8px 4px 6px; margin-bottom: 15px; margin-top: 15px; width: inherit; background-position: inherit inherit; background-repeat: inherit inherit;">public class Product {
private String name;

public Product(String name) {
this.name = name;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}
}</pre>

<pre spellcheck="false" class="md-fences md-end-block ty-contain-cm modeLoaded" lang="java" cid="n877" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--monospace); font-size: 0.9em; display: block; break-inside: avoid; text-align: left; white-space: normal; background-image: inherit; background-size: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248); position: relative !important; border: 1px solid rgb(231, 234, 237); border-top-left-radius: 3px; border-top-right-radius: 3px; border-bottom-right-radius: 3px; border-bottom-left-radius: 3px; padding: 8px 4px 6px; margin-bottom: 15px; margin-top: 15px; width: inherit; background-position: inherit inherit; background-repeat: inherit inherit;">public class Test {
public static void main(String[] args) {
Queue<Product> queue = new ArrayDeque<>();

for (int i = 0; i < 100; i++) {
new Thread(new Producer(queue, 100)).start();
new Thread(new Consumer(queue, 100)).start();
}
}
}</pre>

<pre spellcheck="false" class="md-fences md-end-block ty-contain-cm modeLoaded" lang="java" cid="n879" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--monospace); font-size: 0.9em; display: block; break-inside: avoid; text-align: left; white-space: normal; background-image: inherit; background-size: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248); position: relative !important; border: 1px solid rgb(231, 234, 237); border-top-left-radius: 3px; border-top-right-radius: 3px; border-bottom-right-radius: 3px; border-bottom-left-radius: 3px; padding: 8px 4px 6px; margin-bottom: 15px; margin-top: 15px; width: inherit; background-position: inherit inherit; background-repeat: inherit inherit;">public class Producer implements Runnable{
private Queue<Product> queue;
private int maxCapacity;

public Producer(Queue queue, int maxCapacity) {
this.queue = queue;
this.maxCapacity = maxCapacity;
}

@Override
public void run() {
synchronized (queue) {
while (queue.size() == maxCapacity) { //一定要用 while,而不是 if,下文解释
try {
System.out.println("生产者" + Thread.currentThread().getName() + "等待中... Queue 已达到最大容量,无法生产");
wait();
System.out.println("生产者" + Thread.currentThread().getName() + "退出等待");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if (queue.size() == 0) { //队列里的产品从无到有,需要通知在等待的消费者
queue.notifyAll();
}
Random random = new Random();
Integer i = random.nextInt();
queue.offer(new Product("产品" + i.toString()));
System.out.println("生产者" + Thread.currentThread().getName() + "生产了产品:" + i.toString());
}
}
}</pre>

<pre spellcheck="false" class="md-fences md-end-block ty-contain-cm modeLoaded" lang="java" cid="n882" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--monospace); font-size: 0.9em; display: block; break-inside: avoid; text-align: left; white-space: normal; background-image: inherit; background-size: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248); position: relative !important; border: 1px solid rgb(231, 234, 237); border-top-left-radius: 3px; border-top-right-radius: 3px; border-bottom-right-radius: 3px; border-bottom-left-radius: 3px; padding: 8px 4px 6px; margin-bottom: 15px; margin-top: 15px; width: inherit; background-position: inherit inherit; background-repeat: inherit inherit;"> public void run() {
synchronized (queue) {
while (queue.size() == maxCapacity) { //一定要用 while,而不是 if,下文解释
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if (queue.size() == 0) {
queue.notifyAll();
}
queue.offer(new Product("产品" + i.toString()));
}
}
}</pre>

<pre spellcheck="false" class="md-fences md-end-block ty-contain-cm modeLoaded" lang="java" cid="n905" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--monospace); font-size: 0.9em; display: block; break-inside: avoid; text-align: left; white-space: normal; background-image: inherit; background-size: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248); position: relative !important; border: 1px solid rgb(231, 234, 237); border-top-left-radius: 3px; border-top-right-radius: 3px; border-bottom-right-radius: 3px; border-bottom-left-radius: 3px; padding: 8px 4px 6px; margin-bottom: 15px; margin-top: 15px; width: inherit; background-position: inherit inherit; background-repeat: inherit inherit;">public class Consumer implements Runnable{
private Queue<Product> queue;
private int maxCapacity;

public Consumer(Queue queue, int maxCapacity) {
this.queue = queue;
this.maxCapacity = maxCapacity;
}

@Override
public void run() {
synchronized (queue) {
while (queue.isEmpty()) {
try {
System.out.println("消费者" + Thread.currentThread().getName() + "等待中... Queue 已缺货,无法消费");
wait();
System.out.println("消费者" + Thread.currentThread().getName() + "退出等待");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if (queue.size() == maxCapacity) {
queue.notifyAll();
}

Product product = queue.poll();
System.out.println("消费者" + Thread.currentThread().getName() + "消费了:" + product.getName());
}
}
}</pre>

****我是小齐,纽约程序媛,终生学习者,每天晚上 9 点,云自习室里不见不散!****

这个 Github 汇总了我所有的文章和资料,之后也会一直更新和维护,还希望大家帮忙点个 Star,你们的支持和认可,就是我创作的最大动力,我们下篇文章见!

文中所有代码已经放到了我的 Github 上:https://github.com/xiaoqi6666/NYCSDE

生产者 - 消费者问题是面试中经常会遇到的题目,本文首先讲了该模型的三大优点:解藕,异步,平衡速度差异,然后讲解了等待/通知的消息机制以及在该模型中的应用,最后进行了代码实现。

小结

image

结果如下:

消费者线程是完全对称的,我们来看代码。

总结:在使用线程的等待通知机制时,一般都要在 while 循环中调用 wait() 方法。

因为线程没有一直拿着锁,在被唤醒之后,到拿到锁之间的这段时间里,有可能其他的生产者线程先拿到了锁进行了生产,所以队列又经历了一个从不满到满的过程。

那么为什么可能又满了呢?

  1. 如果队列已满,它就没法生产,那也不能占着位置不做事,所以要把锁让出来,去 Q3 - 等待队列 等着;

  2. 在等待队列里被唤醒之后,不能直接夺过锁来,而是要先加入 Q1 - 同步队列 等待资源;

  3. 一旦抢到资源,关门上锁,才能来到 Q2 继续执行 wait() 之后的活,但是,此时这个队列有可能又满了,所以退出 wait() 之后,还需要再次检查 queue.size() == maxCapacity 这个条件,所以要用 while

其实在这一小段,生产者线程经历了几个过程:

这里有个问题,为什么只能用 while 而不是 if

  1. 生产者线程拿到锁后,其实就是进入了 Q2 阶段。首先检查队列是否容量已满,如果满了,那就要去 Q3 等待;

  2. 如果不满,先检查一下队列原本是否为空,如果原来是空的,那就需要通知消费者;

  3. 最后生产产品。

image

这里有 3 块内容,再对照这个过程来看:

我们把主要逻辑拎出来看:

其实它的主逻辑很简单,我这里为了方便演示加了很多打印语句才显得有点复杂。

然后就是 ProducerConsumer 了。

这里为了更好的理解并发协同的这个过程,我们先自己处理。

BlockingQueue 是阻塞队列,它有一系列的方法可以让线程实现自动阻塞,常用的 BlockingQueue 有很多,后面会单独出一篇文章来讲。

主函数里我设定了两类线程,并且这里选择用普通的 ArrayDeque 来实现 Queue,更简单的方式是直接用 Java 中的 BlockingQueue 来实现。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,591评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,448评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,823评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,204评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,228评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,190评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,078评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,923评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,334评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,550评论 2 333
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,727评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,428评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,022评论 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,672评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,826评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,734评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,619评论 2 354