多线程设计模式:生产者-消费者(Producer-Consumer)

场景

生产者生产数据,消费者消费数据;
但是性能处理速度均有差异,因而需要一个中间队列协调;

举例

3个厨师做甜点,有3个吃货来吃,如果厨师和吃货一对一,厨师的生产速度和吃货的吃饭速度均成成了短板,可能存在这样一种不协调的场景;

  • 慢性子厨师款待一个吃饭快的胖吃货,吃货都要饿死了,只能死等;
  • 急性子厨师款待一个吃饭慢的瘦子吃货(比如我女朋友),吃货吃不完,堆了一堆甜点

这个时候怎么办比较好呢?
加一个橱柜,糕点生产好一个一个放在橱柜里,吃货排队一个一个拿,这样是不是好多了呢,橱柜协调了厨师和吃货的关系,减少了浪费;

角色

生产者 厨师
消费者 吃货
数据 糕点
中间队列 桌子

提升与思考

代码放在最后,先讲几个问题深入大家理解

  1. 队列可以有多种实现(Strategy Pattern)
  • 先放先出 FIFO
  • 后放先出 LIFO
  • 优先级队列 Priority Quene
  1. 为什么生产和消费要用多线程?
    这也是我在实际应用中感受最深的,答案只有一个,单线程太耗时
    只有当多线程的效率提升可以抵消开发难度和性能消耗时才有必要用多线程
  2. 别忘记sychronized 和 notifyAll()
    否则其他线程一直等待,不会继续
  3. 生产线程和消费线程的数量匹配
    上述问题可以只有一个生产线程和一个消费线程(PIPE模式)
    当生产比较费时的时候生产线程多一些,消费耗时时(如网络IO)多一些消费线程(一对多对应线程池的WorkThread设计模式)
  • 当厨师效率较低时(Thread.sleep(1000))
    面包总是被吃光,可以增加厨师或者减少吃货

面包被吃货吃光,等待厨师生产
面包被吃货吃光,等待厨师生产
面包被吃货吃光,等待厨师生产
厨师乙 在桌子上放入了 厨师乙产生的第1个面包
吃货甲 从桌子上取走了 厨师乙产生的第1个面包
面包被吃货吃光,等待厨师生产
面包被吃货吃光,等待厨师生产
厨师甲 在桌子上放入了 厨师甲产生的第2个面包
吃货丙 从桌子上取走了 厨师甲产生的第2个面包
面包被吃货吃光,等待厨师生产
厨师丙 在桌子上放入了 厨师丙产生的第3个面包
吃货乙 从桌子上取走了 厨师丙产生的第3个面包
面包被吃货吃光,等待厨师生产
厨师乙 在桌子上放入了 厨师乙产生的第4个面包
吃货甲 从桌子上取走了 厨师乙产生的第4个面包
面包被吃货吃光,等待厨师生产
厨师丙 在桌子上放入了 厨师丙产生的第5个面包
吃货丙 从桌子上取走了 厨师丙产生的第5个面包
面包被吃货吃光,等待厨师生产
面包被吃货吃光,等待厨师生产

  • 当厨师效率较高时(Thread.sleep(500)时)
    总是放不下,这时可以减少厨师或者增加吃货

厨师丙 在桌子上放入了 厨师丙产生的第18个面包
桌子放满了面包,不能再放了
桌子放满了面包,不能再放了
吃货丙 从桌子上取走了 厨师甲产生的第11个面包
厨师乙 在桌子上放入了 厨师乙产生的第7个面包
桌子放满了面包,不能再放了
桌子放满了面包,不能再放了
吃货乙 从桌子上取走了 厨师丙产生的第16个面包
厨师丙 在桌子上放入了 厨师丙产生的第19个面包
桌子放满了面包,不能再放了
吃货丙 从桌子上取走了 厨师丙产生的第18个面包
厨师甲 在桌子上放入了 厨师甲产生的第17个面包
桌子放满了面包,不能再放了
桌子放满了面包,不能再放了
桌子放满了面包,不能再放了
吃货乙 从桌子上取走了 厨师乙产生的第7个面包
厨师甲 在桌子上放入了 厨师甲产生的第22个面包
桌子放满了面包,不能再放了
桌子放满了面包,不能再放了

所以中间队列给我们提供了一个缓冲区,用于协调生产者和消费者间的性能差异,控制互斥

总结:

  1. 线程的协调运行需要"放在中间的东西"
  2. 线程的互斥处理需要"保护中间的东西"
  3. 中间队列使得两端协调成为肯能

代码

  • 生产者
/**
 * @author xuhe
 * @description 生产者线程
 * @date 2018/5/17
 */
public class ProducerThread extends Thread {

    public static int index;
    private String threadName;
    private DataChannel dataChannel;
    private Random random;

    public ProducerThread(String threadName, DataChannel dataChannel, long randomSeed) {
        this.threadName = threadName;
        this.dataChannel = dataChannel;
        this.random = new Random(randomSeed);
        setName(threadName);
    }


    @Override
    public void run() {
        try {
            while (true){
                Thread.sleep(random.nextInt(500));
                Data data= new Data();
                data.setName(String.format("%s产生的第%s个面包",threadName,getIndex()));
                dataChannel.put(data);
            }

        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private synchronized int getIndex(){
        return ++index;
    }
}
  • 消费者
package com.microparticletech.producer_consumer;

import java.util.Random;

/**
 * @author xuhe
 * @description 消费者线程
 * @date 2018/5/17
 */
public class ConsumerThread extends Thread {

    private DataChannel dataChannel;
    private String threadName;
    private Random random;

    public ConsumerThread(String threadName,DataChannel dataChannel, long randomSeed) {
        this.threadName = threadName;
        this.dataChannel=dataChannel;
        this.random = new Random(randomSeed);
        setName(threadName);
    }


    @Override
    public void run() {
        System.out.println(threadName+"启动");
        try {
            while (true){
                Data data=new Data();
                data=dataChannel.get();
                Thread.sleep(random.nextInt(1000));
            }

        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

  • 数据

@lombok.Data
public class Data {
    private String name;

    public void setName(String name) {
        this.name = name;
    }

    @Override
    public String toString() {
        return
                "name='" + name + '\'';
    }
}
  • 中间队列
public interface DataChannel {

     void  put(Data data) throws InterruptedException;

    Data get() throws InterruptedException;

}

public class DataChannelFIFOImpl implements DataChannel {
//先进先出的队列实现
    private static final int CAPACITY=3;
    private Data[] dataArray = new Data[CAPACITY];
    private int head;
    private int tail;
    private int count;

    @Override
    public synchronized void put(Data data) throws InterruptedException {
        while (count>=CAPACITY){
            System.out.println("桌子放满了面包,不能再放了");
            wait();
        }
        System.out.println(String.format("%s 在桌子上放入了 %s",Thread.currentThread().getName(),data.getName()));
        dataArray[tail]=data;
        tail=(tail+1)%CAPACITY;
        count++;
        notifyAll();
    }

    @Override
    public synchronized Data get() throws InterruptedException {
        while (count<=0){
            System.out.println("面包被吃货吃光,等待厨师生产");
            wait();
        }
        Data data=dataArray[head];
        System.out.println(String.format("%s 从桌子上取走了 %s",Thread.currentThread().getName(),data.getName()));
        count--;
        head=(head+1)%CAPACITY;
        notifyAll();
        return data;
    }
}

  • 启动类
public class MainTest {

    public static void main(String[] args) {
        int size=3;
        DataChannel channel = new DataChannelFIFOImpl();

        Thread producer1 = new ProducerThread("厨师甲" , channel, 31415);
        Thread producer2 = new ProducerThread("厨师乙" , channel, 92653);
        Thread producer3 = new ProducerThread("厨师丙" , channel, 58979);
        producer1.start();
        producer2.start();
        producer3.start();

        Thread consumer1 = new ConsumerThread("吃货甲" , channel, 32384);
        Thread consumer2 = new ConsumerThread("吃货乙" , channel, 62643);
        Thread consumer3 = new ConsumerThread("吃货丙" , channel, 38327);
        consumer1.start();
        consumer2.start();
        consumer3.start();
        
    }
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,029评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,238评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,576评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,214评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,324评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,392评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,416评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,196评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,631评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,919评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,090评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,767评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,410评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,090评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,328评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,952评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,979评论 2 351

推荐阅读更多精彩内容

  • 前言 《四柱特训班讲义》一书,是笔者根据2003年春举办的四柱特训班讲课记录的基础上整理出来的。它是以《四柱详真》...
    小狐狸娃娃阅读 11,802评论 1 29
  • 冬天的夜晚非常的冷,小女孩儿独自在大街上走着,没人给她钱,也没人给她吃的,她就这样,孤零零的走着,在快天黑的时候...
    雨_Lyj阅读 673评论 0 0
  • 很多职场新人都会遇到以下3个问题:感觉每天都很多事情要干,还不知道从何干起;所做的事情都很零碎,却依然没法按时完成...
    缘小异阅读 2,042评论 0 1
  • 寿光世纪教育集团东城初中 马宗国 “这种教学评的一致性研究和我们的课程标准校本化研究是相通的,给了我们很多...
    从心而觅阅读 708评论 0 1
  • 1、指导型的学习与自我发现型的学习 自我发现型学习就是没有老师指导的学习方式,学习者是立足于自然或世界,来阅读自我...
    飞鹰于凯阅读 214评论 0 1