第5章-ProducerConsumer

[TOC]

5.1 模式简介

  1. Producer是 “生产者” 的意思,指的是生成数据的线程。Consumer则是 “消费者” 的意思,指的是使用数据的线程。
  2. 生产者安全地将数据交给消费者。虽然仅是这样看似简单的操作,但当生产者和消费者以不同的线程运行时,两者之间的处理速度差异便会引起问题。例如,消费者想要获取数据,可数据还没生成,或者生产者想要交付数据,而消费者的状态还无法接收数据等。
  3. Producer-Consumer模式在生产者和消费者之间加入了一个 “桥梁角色” 。该桥梁角色用于消除线程间处理速度的差异。
  4. 一般来说,在该模式中,生产者和消费者都有多个,当然生产者和消费者有时也会只有一个。当两者都只有一个时,我们称之为Pipe模式。

5.3 Producer-Consumer模式中的角色

5.3.1 Data

  1. Data角色由Producer角色生成,供Consumer角色使用。

5.3.2 Producer

  1. Producer角色生成Data角色,并将其传递给Channel角色。

5.3.3 Consumer

  1. Consumer角色从Channel角色获取Data角色并使用。

5.3.4 Channel

  1. Channel角色保管从Producer角色获取的Data角色,还会响应Consumer角色的请求,传递Data角色。为了确保安全性,Channel角色会对Producer角色和Consumer角色的访问执行互斥处理。
  2. 当Producer角色将Data角色传递给Channel角色时,如果Channel角色的状态不适合接收Data角色,那么Producer角色将一直等待,直至Channel角色的状态变为可以接收为止。
  3. 当Consumer角色从Channel角色获取Data角色时,如果Channel角色中没有可以传递的Data角色,那么Consumer角色将一直等待,直至Channel角色的状态变为可以传递Data角色为止。
  4. 当存在多个Producer角色和Consumer角色时,为了避免各处理互相影响,Channel角色需要执行互斥处理。这样看来,Channel角色位于Producer角色和Consumer角色之间,承担用于传递Data角色的中转站、通道的任务。

5.3.5 类图

类图

5.4 拓展思路的要点

5.4.1 守护安全性的Channel角色

  1. 在Producer-Consumer模式中,承担安全守护责任的是Channel角色。Channel角色执行线程间的互斥处理,确保Producer角色正确地将Data角色传递给Consumer角色。
  2. 在示例程序中,Table类的put方法和take方法都使用了Guarded Suspension模式。但MakerThread类和EaterThread类并不依赖于Table类的具体实现。也就是说,MakerThread不会顾虑其他线程如何,而是直接调用put方法,同样地,EaterThread也是直接调用take方法。那些使用synchronized、wait和notifyAll等来控制多线程运行的代码,都隐藏在了Channel角色的Table类中。

5.4.2 不可以直接传递吗

  • Producer-Consumer模式为了从Producer角色向Consumer角色传递Data角色,在中间设置了一个Channel角色。那么Producer角色不可以直接调用Consumer角色的方法吗?
  1. 直接调用方法
  • Consumer角色想要获取Data角色,通常都是因为想使用这些Data角色来执行某些处理。如果Producer角色直接调用Consumer角色的方法,那么执行处理的就不是Consumer角色的线程,而是Producer角色的线程了。
  • 这样一来,执行处理花费的时间就必须由Producer角色的线程来承担,准备下一个数据的处理也会相应发生延迟。这样会使程序的响应性变得很差。
  • 直接调用方法就好比糕点师做好蛋糕,直接交给客人,在客人吃完后再做下一个蛋糕一样。
  1. 插入Channel角色
  • 我们再来思考一下插入Channel角色这种方法。Producer角色将Data角色传递给Channel角色之后,无需等待Consumer角色对Data角色进行处理,可以立即开始准备下一个Data角色。也就是说,Producer角色可以持续不断地创建Data角色。Producer角色不会受到Consumer角色的处理进展状况的影响。
  • 当然,虽然可以持续不断地创建Data角色,但也只能是在Channel角色能够储存的范围之内。如果Channel角色中没有剩余空间,那么就无法再添加Data角色了。

5.4.3 存在中间角色的意义

  1. 线程的协调运行要考虑 “放在中间的东西” 。线程的互斥处理要考虑 “应该保护的东西”
  2. 协调运行和互斥处理其实是内外统一的。为了让线程协调运行,必须执行互斥处理,以防止共享的内容被破坏。而线程的互斥处理是为了线程的协调运行才执行的。因此,协调运行和互斥处理之间有着很深的关系。

5.6 理解InterruptedException异常

5.6.1 可能会花费时间,但可以取消

  1. 如果方法后面加了throws InterruptedException,则表明该方法中(或者该方法进一步调用的方法中)可能会抛出InterruptedException异常。
  2. 这包含下面两层含义:
  • 是 “花费时间” 的方法
  • 是 “可以取消” 的方法
  1. 用一句话来说就是,加了throws InterruptedException的方法可能会花费时间,但可以取消。

5.6.2 加了throws InterruptedException的方法

  1. 在Java的标准类库中,加了throws InterruptedException的典型方法有如下三个:
  • java.lang.object类的 wait 方法
  • java.lang.Thread类的 sleep 方法
  • java.lang.Thread类的 join 方法
  1. 花费时间的方法
  • 线程执行wait方法后,会进入等待队列,等待被notify/notifyAll。在等待期间,线程是不运行的,但需要花费时间来等待被notify/notifyAll。
  • 线程执行sleep方法后,会暂停执行(暂停多长时间由参数指定)。这也是花费时间的方法。
  • 线程执行join方法后,会等待指定线程终止。该方法需要花费时间,来等待指定线程终止。
    如上所述,上面这三个方法需要等待 “被notify/notifyAll、指定时间、指定线程终止”,确实是 “花费时间” 的方法。
  1. 可以取消的方法
  • 花费时间的处理会降低程序的响应性,所以如果存在像下面这样可以中途停止执行(取消)的方法,就非常方便了。
  • 取消“wait方法等待notify/notifyAll”的处理
  • 取消“在sleep方法指定的时间内停止执行”的处理
  • 取消“join方法等待其他线程终止”的处理

5.6.3 sleep方法和interrupt方法

  1. 假设线程A因为执行 sleep 正处于暂停状态,想要取消,只能由其他线程来执行该操作,假设为线程B;
  2. 线程B可以执行 A.interrupt() 来中途停止线程A的暂停操作,变量A里保存着与线程A对应的Thread实例。
  3. 这里使用的interrupt方法是Thread类的实例方法。当执行interrupt时,线程并不需要获取 Thread实例的锁。无论何时,任何线程都可以调用其他线程的interrupt方法。
  4. interrupt 方法被调用后,正在sleep的线程会终止暂停状态,抛出InterruptedException异常。此处抛出异常的是线程A。

5.6.4 wait方法和interrupt方法

  1. 在线程A使用wait进行等待时,也可以使用 A.interrupt() 来中途停止线程A的等待操作。

  2. 但在wait的情况下,我们需要注意锁的问题。线程在进入等待队列时,已经释放了锁。当正在wait的线程被调用interrupt方法时(即线程被取消执行时),该线程会在重新获取锁之后,抛出InterruptedException异常。在获取锁之前,线程不会抛出InterruptedException异常

  3. 从让正在wait的线程重新运行这一点来说,notify方法和interrupt方法的作用有些类似,但仍有以下不同:

    方法 方法来源 唤醒对象 是否需要获取实例的锁
    notify/notifyAll java.lang.Object 实例等待队列中的线程
    interrupt java.lang.Thread 指定线程

5.6.5 join方法和interrupt方法

  1. 当线程使用join方法等待其他线程终止时,也可以使用interrupt方法进行取消。由于调用join方法时无需获取锁,所以与使用sleep暂停运行时一样,线程的控制权也会立即跳到catch语句块中。

5.6.6 interrupt 方法只是改变中断状态

  1. 有人也许会认为“当调用interrupt方法时,调用对象的线程就会抛出InterruptedException异常”,其实这是一种误解。实际上,interrupt方法只是改变了线程的中断状态而已。
  2. 所谓中断状态(interrupted status),是一种用于表示线程是否被中断的状态。
  3. 假设当线程Alice执行了sleep、wait、join而停止运行时,线程Bobby调用了Alice的interrupt 方法。这时,线程Alice的确会抛出InterruptedException异常。但这其实是因为sleep、wait、join方法内部对线程的中断状态进行了检查,进而抛出了InterruptedException异常。
  4. 假设线程Alice执行了1+2之类的计算或a=123之类的赋值操作。这时,即使Bobby 调用Alice的interrupt 方法,Alice也不会抛出InterruptedException异常,而是继续执行处理。不仅仅是计算和赋值,for语句、while语句、if语句及方法调用都不会检查中断状态
  5. 如果没有调用sleep、wait、join等方法,或者没有编写检查线程的中断状态并抛出InterruptedException 异常的代码,那么InterruptedException异常就不会被抛出。

5.6.7 isInterrupt方法和Thread.interrupted方法

  1. isInterrupted是Thread类的实例方法,用于检查指定线程的中断状态。该方法不会改变中断状态。
  2. Thread.interrupted是Thread类的静态方法,用于检查并清除当前线程的中断状态。只有这个方法才可以清除中断状态。Thread.interrupted的操作对象是当前线程(即线程本身),所以该方法并不能用于清除其他线程的中断状态

5.7 java.util.concurrent包和Producer-Consumer模式

5.7.1 java.util.concurrent包中的队列

类图
  1. BlockingQueue 接口——阻塞队列

    • BlockingQueue接口表示的是在达到合适的状态之前线程一直阻塞(wait)的队列。
    • BlockingQueue是java.util.Queue接口的子接口,拥有offer方法和poll方法等。但实际上,实现 “阻塞” 功能的方法是BlockingQueue接口固有的put方法和take方法。
    • 由于BlockingQueue是一个接口,所以在实际使用时,需要使用BlockingQueue的实现类。下面列举的就是BlockingQueue的实现类。
  2. ArrayBlockingQueue 类——基于数组的BlockingQueue

    • ArrayBlockingQueue类表示的是元素个数有最大限制的BlockingQueue。当数组满了但仍要put数据时,或者数组为空但仍要take数据时,线程就会阻塞。
  3. LinkedBlockingQueue 类——基于链表的BlockingQueue

    • LinkedBlockingQueue 类表示的是元素个数没有最大限制的BlockingQueue。该类基于链表,如果没有特别指定,元素个数将没有最大限制。只要还有内存,就可以put数据。
  4. PriorityBlockingQueue 类——带有优先级的BlockingQueue

    • PriorityBlockingQueue类表示的是带有优先级的BlockingQueue。数据的 “优先级”是依据Comparable接口的自然排序,或者构造函数的Comparator接口决定的顺序指定的。
  5. DelayQueue类—在一定时间之后才可以 take 的BlockingQueue

    • DelayQueue类表示的是用于储存 java.util.concurrent.Delayed对象的队列。当从该队列take时,只有在各元素指定的时间到期后才可以take。另外,到期时间最长的元素将先被take。
  6. SynchronousQueue 类——直接传递的BlockingQueue

    • SynchronousQueue类表示的是BlockingQueue,该BlockingQueue用于执行由Producer角色到Consumer角色的 “直接传递”。如果Producer角色先put,在Consumer角色take之前,Producer角色的线程将一直阻塞。相反,如果Consumer角色先take,在Producer角色put之前,Consumer角色的线程将一直阻塞。
  7. ConcurrentLinkedQueue类——元素个数没有最大限制的线程安全队

    • ConcurrentLinkedQueue类并不是BlockingQueue的实现类,它表示的是元素个数没有最大限制的线程安全队列。在ConcurrentLinkedQueue中,内部的数据结构是分开的,线程之间互不影响,所以也就无需执行互斥处理。根据线程情况的不同,有时程序的性能也会有所提高。

5.7.2 使用java.util.concurrent.Exchanger类交换缓冲区

Timethreads图
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,884评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,755评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,369评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,799评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,910评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,096评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,159评论 3 411
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,917评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,360评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,673评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,814评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,509评论 4 334
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,156评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,882评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,123评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,641评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,728评论 2 351

推荐阅读更多精彩内容