《生产者与消费者》

志梳理下,生产者消费者模式

简单的模型

先从一个例子开始吧,有一些角色我先声明如下:

  • 餐厅(Restaurant)--->载体
  • 厨师(Chef) --->生产者
  • 服务员(WaiterPerson) --->消费者
  • 食物(Meaf)--->被消费

我梳理一下它们的工作流程:

  • 故事的地点发生在餐厅,它是载体,包括了厨师、服务员、食物。

  • 厨师在餐厅做饭,做完饭,饭放在橱窗,通知服务员端走,送给客人吃完;期间,厨师会不断地监控橱窗的食物是否被端走,如果端走则继续做新的食物,否则等待。

  • 服务员也不能闲着,它时刻留心着橱窗是否有食物上架,如果没有则继续等待。如果有食物则端走,并通知厨师,我端走食物了,你可以做新食物了。

那么按照上面的步骤,首先我们看看生产者Chef的基本代码

   synchronized(this){
       while (restaurant.meal != null) {
              wait(); 
       }
  }

上面代码表示,倘若食物已经做好一份了,厨师不断监控橱窗上面的食物,如果没有被服务员端走(消费),那我厨师就继续等待,多休息一会。注意这里用while而不是if是因为防止多个消费者产生竞争引起并发问题。

 System.out.println("饭做好了,订单生成...")。
  synchronized(restaurant.waiter){
       restaurant.meal=new Meal();
       restaurant.waiter.notifyAll();
 }

上面代码表示 ,厨师没有等待了,他开始做饭(生产),完成生产食物后,厨师通知(notifyAll)正在橱窗等待食物的服务员,叫他去端菜(消费)。

那消费者Waiter的流程呢?我想过程应该是和生产者恰好是对立的。

synchronized(this){
    while(restaurant.meal==null){
           wait();
      }
}

上面代码表示,服务员不断监控橱窗上面的食物有没有做好,如果没有做好,那我服务员就继续等待。 是吧?和前面的生产者的判断条件刚好对立。

 System.out.println("我服务员把饭端走了...")。
 synchronized(restaurant.chef){
       restaurant.meal=null;
       restaurant.chef.notifyAll();
}

上面代码表示 ,服务员被厨师通知端饭(消费)了,于是他开始端饭送个客人,导致橱窗上没有饭了,之后,服务员通知(notifyAll)橱窗口正在等待的厨师去做下一道菜(生产)。

通过上面的例子,我们可以初步了解生产者与消费者的工作模式。但是实际开发场景中,应该有不止一个生产者或者消费者,而且食物应该很多,那么这个时候我们应该引入队列(Queque)这个数据结构来管理它们了。

利用队列管理生产者与消费者

我们可以设想一下,在餐厅中的业务场景,厨师chef应该作为Runable角色可以有多个,我们可以用Excutor.submit(r)提交很多个厨师,让其工作, 而服务员我们也可以有多个,同理,我们也把他放入线程池去运行。 而食物Meal也有多个,并且我们要用一个数据结构存取它,让它作为厨师和服务员两者共同占有的资源又能做好同步处理。在上面的例子中,我们用wait(),notifyAll(),synchronized等方法进行食物的同步与通信。它们有一个明显的缺点,我们发现代码很是耦合,晦涩难懂,暂且不谈性能。

让开发者欣慰的事,JDK中提供了BlockQueque接口来存取“食物”。它是一个阻塞队列的数据结构。在这里,我们需要了解两点;

  • 在开发过程中,"食物"常常指是的IO流。如网络编程中,服务端与客户端发送字节流相互通信。现在有netty或者nio等异步非阻塞IO的框架,让并发性能更佳。

  • 阻塞是为了保证生产者与消费者步调一致,不要产生大量浪费的食物,消费者吃不完,导致资源耗尽。亦或者消费者盲目的去找生产者要食物,太多消费者拥挤,也会消耗资源。所以在刚刚开始的时候,jdk做了这个BlockQueque来管理食物和生产者和消费者通信。 生产者要生产食物如下面的代码:

    @Override
      public void run() {
      try {
          while (!Thread.interrupted()) {
              Meal meal = new Meal(++count);
              mBlockQueque.put(meal);// 如果mBlockQueque容量不为empty则阻塞等待。                                             
              TimeUnit.SECONDS.sleep(2);//模拟生产耗时任务。
           }
        } catch (InterruptedException e) {
          System.out.println("Chef sleep end interrupted...");
          e.printStackTrace();
        }
    }
    

上面mBlockQueque.put()为阻塞方法(如果橱窗(队列)还有食物未被领取,则等待不生产食物,否则生产食物并添加至橱窗),如注释上的说明,它的作用类似wait()/add();我们跟踪下源码:

   /**
     * @throws NullPointerException {@inheritDoc}
     * @throws InterruptedException {@inheritDoc}
     */
    public void putFirst(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        Node<E> node = new Node<E>(e);
        final ReentrantLock lock = this.lock;
        lock.lock(); //                                            ---(1)
        try {
            while (!linkFirst(node))
                notFull.await();                                   ---(2)
        } finally {
            lock.unlock();
        }
    }

我解析下上面的代码:put()是一个接口方法,它具体的实现方法之一是putFirst,给链表首位添加一个元素。

  1. (1)此处有lock-finally-unlock组成的临界区。它的作用类似synchronized,用来同步。它们之间不同的地方是:

一、用synchronized声明锁时,任务A和任务B,都要获取锁O,如果A首先获得锁O,B则一直等待直到A释放锁,B一直阻塞着不能被中断。
二、用lock-finally-unlock声明锁时,任务A和任务B,都要获取锁O,如果A首先获得锁O,B可以等待一段时间,不想等待了,可以自行中断。A如果想释放锁必须在finally后调用unlock。所以说我觉得lock更加灵活。
但是在大多数资源竞争不太激烈的情况下,我们还是用synchronized足够了。

  1. (2)此处notFull是Condition的实例。它提供更好的性能,通过await()/signal()方法扮演之前的wait()/notify()的角色。 这里代码是指while判断链表是否超过容量,返回false时,则调用await()阻塞等待当前任务线程。

我们分析完生产者chef,我们来看看消费者waiter的改造后的代码:

@Override
    public void run() {
        try {
            while (!Thread.interrupted()) {
                Meal meal = mBlockQueque.take();//从队列中remove出一个食物,没有食物则阻塞等待
                meal.run();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

上述代码中,mBlockQueque.take()是一个可阻塞方法。它试图从橱窗队列上取食物,如果发现没有食物就阻塞消费者线程。看看take()的具体实现的源码:

 public E takeFirst() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            E x;
            while ( (x = unlinkFirst()) == null) //                       ---(1)
                notEmpty.await();                                         ---(2)
            return x;
        } finally {
            lock.unlock();
        }
    }

(1)takeFisrt()方法中会有去调用unlinkFirst()去队列返回一个食物,如果有食物,就返回,并调用notFull.signal()唤醒正在阻塞的生产者线程。
(2) notEmpty是另外一个Condition实例,它用来和消费者线程通信。如果发现返回的食物为空,则notEmpty.await()让消费者线程阻塞等待。
至此。我们看到我们把具体的通信交互过程封装到了阻塞队列BlockQueue里。 生产者只需要调用take通信,消费者只需调用put通信。如下图:

通信结构

写到这里了,那生产者与消费者模式有哪些实际应用呢? 我想线程池应该是应用最广泛的地方。下一篇我将详细介绍线程池的原理。

注:部分参考自《Java 编程思想》

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,445评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,889评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,047评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,760评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,745评论 5 367
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,638评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,011评论 3 398
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,669评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,923评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,655评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,740评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,406评论 4 320
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,995评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,961评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,197评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,023评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,483评论 2 342

推荐阅读更多精彩内容