Java多线程之生产者消费者

你们既从罪里得了释放,就做了义的奴仆。---罗马书6:18

概念介绍

生产者--消费者模型是多线程运用的经典案例,其设定了这样一个场景,生产者消费者分属两个不同线程,但它们都共同拥有一个数据缓冲区,这个缓冲区用来平衡生产者消费者处理数据不同步的问题。

拿生活中的例子举例,比如我们去餐馆吃饭,餐馆出菜的速度不一致,前来餐馆的消费者也时多时少,为了减少消费者等待上菜的时间,餐馆在消费者还没有点菜的时候提前就先做一部分菜出来,这提前做出来的菜就可以被看做是缓冲区,新的消费者到来时,餐馆直接从缓冲区拿出现成的菜给消费者,这样就减少了消费者等待菜品的时间,同时我们也要注意提前做多少菜是要有一定限制的,做的太多,会导致菜品放置时间过长,食物变质,做的太少又起不到缓冲的作用。

举例说明

BlockingQueue实现

BlockingQueue实现方式应该是最简单易懂的,主要是因为BlockingQueue这个缓冲区已经实现了锁机制,具体来说就是BlockingQueue能定义缓冲区间的大小,同时这个缓冲区能保证向里添加数据和向外提供数据在同一个时刻只能选择其中一个。生产线程只管向里添加数据,缓冲区满了,会自动block添加动作,同样的消费线程也只管从里面取数据,缓冲区空了,也会自动block取的动作,直到缓冲区有了新的数据。
下面放出代码。

package com.azhengye.test;

import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

public class ProduConsTest {
    public static void main(String[] args) {
        BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(10);
        Producer producer = new Producer(queue);
        Consumer consumer = new Consumer(queue);
        new Thread(producer).start();
        new Thread(consumer).start();
    }
}

class Producer implements Runnable {
    BlockingQueue<Integer> bufferQueue;
    int i = 0;

    Producer(BlockingQueue<Integer> shareQueue) {
        this.bufferQueue = shareQueue;
    }

    @Override
    public void run() {
        while (true) {
            try {
                Random random = new Random();
                int i = random.nextInt();
                //System.out.println("生产者开始生产了");
                bufferQueue.put(i);
                System.out.println("生产了" + i);
            } catch (InterruptedException e1) {
                i--;
                e1.printStackTrace();
            }
        }

    }
}

class Consumer implements Runnable {
    BlockingQueue<Integer> shareQueue;

    Consumer(BlockingQueue<Integer> shareQueue) {
        this.shareQueue = shareQueue;
    }

    @Override
    public void run() {
        while (true) {
            try {
                //System.out.println("消费者来了");
                int i = shareQueue.take();
                System.out.println("消费了" + i);
                TimeUnit.SECONDS.sleep(4);
            } catch (InterruptedException e1) {
                e1.printStackTrace();
            }
        }

    }
}

运行结果如下图

这里写图片描述

这里似乎出现了问题,一开始就出现了消费了1405535437的log,还没有生产呢,消费的数据从哪来的。太不合理了。

别急我们打开注释掉的语句在看看结果


这里写图片描述

配合着打开注释的输出结果,我们体会下为什么出现了第一次那样不合理的结果。

由于Producer跟Consumer分属两个不同的线程,同时启动它俩,系统就会在它们直接来回切换。当Producer线程刚执行完 bufferQueue.put(i); 语句后,恰好系统切换至Consumer线程,此时shareQueue里已经有了数据,于是执行了

int i = shareQueue.take();
System.out.println("消费了" + i);

从而我们就首先看到消费了1405535437的怪异log。从这点我们也可以看到多线程执行顺序的不确定性。

wait()-notify()实现

wait,notifiy以及notifyAll方法都是Object中的方法,要搞清它们的用法,我们必须要弄清楚2个基本原则:

  • wait和notifiy/notifyAll都要在synchronized代码块内部调用
  • wait和notifiy/notifyAll都被同一个对象调用才有意义。

现在来模拟实现一个模型:一个生产者不断向队列插入数据,多个消费者从队列中删除数据。

package com.azhengye.test;

import java.util.LinkedList;
import java.util.Random;

public class ProduConsTestNotifyWaitDemo {
    private static final int BUFF_SIZE = 10;
    private static final int CONSUMER_COUNT = 5;

    public static void main(String[] args) {
        LinkedList<Integer> buffList = new LinkedList<Integer>();
        Producer producer = new Producer(buffList);
        Consumer consumer = new Consumer(buffList);
        for (int i = 0; i < CONSUMER_COUNT; i++) {
            new Thread(consumer).start();
        }
        new Thread(producer).start();
    }
    
    static class Producer implements Runnable {
        private LinkedList<Integer> list = null;

        public Producer(LinkedList<Integer> list) {
            this.list = list;
        }

        @Override
        public void run() {
            while (true) {
                synchronized (list) {
                    if (list.size() > BUFF_SIZE) {// 用if判断是个坑
                        try {
                            System.out.println("Producer:"
                                    + Thread.currentThread().getId() + "wait");
                            list.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    Random random = new Random();
                    int i = random.nextInt();
                    System.out.println("Producer:"
                            + Thread.currentThread().getId() + "增加了内容为" + i
                            + "的节点");
                    list.add(i);
                    list.notifyAll();
                }
                try {
                    Thread.sleep(200);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    static class Consumer implements Runnable {
        private LinkedList<Integer> list = null;

        public Consumer(LinkedList<Integer> list) {
            this.list = list;
        }

        @Override
        public void run() {
            while (true) {
                synchronized (list) {
                    if (list.isEmpty()) {// 用if判断是个坑
                        System.out.println("Consumer"
                                + Thread.currentThread().getId() + "wait");
                        try {
                            list.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    System.out.println("Consumer:"
                            + Thread.currentThread().getId() + "移除了内容为"
                            + list.remove() + "的节点");
                    list.notifyAll();
                }
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

运行结果如下图


异常结果

代码注释里已经标记了埋坑点,来仔细分析下这个错误过程:

  1. Consumer8~Consumer12因list未空,没什么要"消费"的,因此都处于wait状态,逻辑合理。
  2. Producer3向list里添加了一个数据,然后调用notifyAll通知1步wait状态的Consumer8~Consumer12别等了,开始等着系统派活了,但系统只会从Consumer8~Consumer12随机抽取一个来干活,Consumer12被抽中了,于是它去干活,Consumer8~Consumer11继续去wait,Consumer12删除了list里唯一的数据,然后也调用notifyAll让处于wait状态的别等了,系统又要重新派活了。到此逻辑也合理。
  3. Consumer8~Consumer11其中的某一个被系统抽中了,假如Consumer8被抽中,由于判断条件为if,Consumer8跳出if语句继续往下执行,注意2步已经使得list为空了,因此Consumer8执行到list.remove() 出现了上图的结果,异常出现了。

为了避免上述错误,在添加一个原则

  • 在while循环里判断操作条件,而不是if。

将if换成while后程序运行正常:


正常运行结果

额外说明:
提下wait跟sleep的区别。表面看起来它们都是停止了当前线程的工作任务,但本质却不同。wait是需要别人去notify才会继续工作,而sleep(n)在n毫秒后自己就醒来了,不用去依赖别人。
synchronized锁住的是对象,因此针对该对象的synchronized方法同一时间只能有一个线程去访问

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,837评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,551评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,417评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,448评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,524评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,554评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,569评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,316评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,766评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,077评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,240评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,912评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,560评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,176评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,425评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,114评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,114评论 2 352

推荐阅读更多精彩内容