[TOC]
5.1 模式简介
- Producer是 “生产者” 的意思,指的是生成数据的线程。Consumer则是 “消费者” 的意思,指的是使用数据的线程。
- 生产者安全地将数据交给消费者。虽然仅是这样看似简单的操作,但当生产者和消费者以不同的线程运行时,两者之间的处理速度差异便会引起问题。例如,消费者想要获取数据,可数据还没生成,或者生产者想要交付数据,而消费者的状态还无法接收数据等。
- Producer-Consumer模式在生产者和消费者之间加入了一个 “桥梁角色” 。该桥梁角色用于消除线程间处理速度的差异。
- 一般来说,在该模式中,生产者和消费者都有多个,当然生产者和消费者有时也会只有一个。当两者都只有一个时,我们称之为Pipe模式。
5.3 Producer-Consumer模式中的角色
5.3.1 Data
- Data角色由Producer角色生成,供Consumer角色使用。
5.3.2 Producer
- Producer角色生成Data角色,并将其传递给Channel角色。
5.3.3 Consumer
- Consumer角色从Channel角色获取Data角色并使用。
5.3.4 Channel
- Channel角色保管从Producer角色获取的Data角色,还会响应Consumer角色的请求,传递Data角色。为了确保安全性,Channel角色会对Producer角色和Consumer角色的访问执行互斥处理。
- 当Producer角色将Data角色传递给Channel角色时,如果Channel角色的状态不适合接收Data角色,那么Producer角色将一直等待,直至Channel角色的状态变为可以接收为止。
- 当Consumer角色从Channel角色获取Data角色时,如果Channel角色中没有可以传递的Data角色,那么Consumer角色将一直等待,直至Channel角色的状态变为可以传递Data角色为止。
- 当存在多个Producer角色和Consumer角色时,为了避免各处理互相影响,Channel角色需要执行互斥处理。这样看来,Channel角色位于Producer角色和Consumer角色之间,承担用于传递Data角色的中转站、通道的任务。
5.3.5 类图
5.4 拓展思路的要点
5.4.1 守护安全性的Channel角色
- 在Producer-Consumer模式中,承担安全守护责任的是Channel角色。Channel角色执行线程间的互斥处理,确保Producer角色正确地将Data角色传递给Consumer角色。
- 在示例程序中,Table类的put方法和take方法都使用了Guarded Suspension模式。但MakerThread类和EaterThread类并不依赖于Table类的具体实现。也就是说,MakerThread不会顾虑其他线程如何,而是直接调用put方法,同样地,EaterThread也是直接调用take方法。那些使用synchronized、wait和notifyAll等来控制多线程运行的代码,都隐藏在了Channel角色的Table类中。
5.4.2 不可以直接传递吗
- Producer-Consumer模式为了从Producer角色向Consumer角色传递Data角色,在中间设置了一个Channel角色。那么Producer角色不可以直接调用Consumer角色的方法吗?
- 直接调用方法
- Consumer角色想要获取Data角色,通常都是因为想使用这些Data角色来执行某些处理。如果Producer角色直接调用Consumer角色的方法,那么执行处理的就不是Consumer角色的线程,而是Producer角色的线程了。
- 这样一来,执行处理花费的时间就必须由Producer角色的线程来承担,准备下一个数据的处理也会相应发生延迟。这样会使程序的响应性变得很差。
- 直接调用方法就好比糕点师做好蛋糕,直接交给客人,在客人吃完后再做下一个蛋糕一样。
- 插入Channel角色
- 我们再来思考一下插入Channel角色这种方法。Producer角色将Data角色传递给Channel角色之后,无需等待Consumer角色对Data角色进行处理,可以立即开始准备下一个Data角色。也就是说,Producer角色可以持续不断地创建Data角色。Producer角色不会受到Consumer角色的处理进展状况的影响。
- 当然,虽然可以持续不断地创建Data角色,但也只能是在Channel角色能够储存的范围之内。如果Channel角色中没有剩余空间,那么就无法再添加Data角色了。
5.4.3 存在中间角色的意义
- 线程的协调运行要考虑 “放在中间的东西” 。线程的互斥处理要考虑 “应该保护的东西” 。
- 协调运行和互斥处理其实是内外统一的。为了让线程协调运行,必须执行互斥处理,以防止共享的内容被破坏。而线程的互斥处理是为了线程的协调运行才执行的。因此,协调运行和互斥处理之间有着很深的关系。
5.6 理解InterruptedException异常
5.6.1 可能会花费时间,但可以取消
- 如果方法后面加了throws InterruptedException,则表明该方法中(或者该方法进一步调用的方法中)可能会抛出InterruptedException异常。
- 这包含下面两层含义:
- 是 “花费时间” 的方法
- 是 “可以取消” 的方法
- 用一句话来说就是,加了throws InterruptedException的方法可能会花费时间,但可以取消。
5.6.2 加了throws InterruptedException的方法
- 在Java的标准类库中,加了throws InterruptedException的典型方法有如下三个:
- java.lang.object类的 wait 方法
- java.lang.Thread类的 sleep 方法
- java.lang.Thread类的 join 方法
- 花费时间的方法
- 线程执行wait方法后,会进入等待队列,等待被notify/notifyAll。在等待期间,线程是不运行的,但需要花费时间来等待被notify/notifyAll。
- 线程执行sleep方法后,会暂停执行(暂停多长时间由参数指定)。这也是花费时间的方法。
- 线程执行join方法后,会等待指定线程终止。该方法需要花费时间,来等待指定线程终止。
如上所述,上面这三个方法需要等待 “被notify/notifyAll、指定时间、指定线程终止”,确实是 “花费时间” 的方法。
- 可以取消的方法
- 花费时间的处理会降低程序的响应性,所以如果存在像下面这样可以中途停止执行(取消)的方法,就非常方便了。
- 取消“wait方法等待notify/notifyAll”的处理
- 取消“在sleep方法指定的时间内停止执行”的处理
- 取消“join方法等待其他线程终止”的处理
5.6.3 sleep方法和interrupt方法
- 假设线程A因为执行 sleep 正处于暂停状态,想要取消,只能由其他线程来执行该操作,假设为线程B;
- 线程B可以执行
A.interrupt()
来中途停止线程A的暂停操作,变量A里保存着与线程A对应的Thread实例。 - 这里使用的interrupt方法是Thread类的实例方法。当执行interrupt时,线程并不需要获取 Thread实例的锁。无论何时,任何线程都可以调用其他线程的interrupt方法。
- interrupt 方法被调用后,正在sleep的线程会终止暂停状态,抛出InterruptedException异常。此处抛出异常的是线程A。
5.6.4 wait方法和interrupt方法
在线程A使用wait进行等待时,也可以使用
A.interrupt()
来中途停止线程A的等待操作。但在wait的情况下,我们需要注意锁的问题。线程在进入等待队列时,已经释放了锁。当正在wait的线程被调用interrupt方法时(即线程被取消执行时),该线程会在重新获取锁之后,抛出InterruptedException异常。在获取锁之前,线程不会抛出InterruptedException异常。
-
从让正在wait的线程重新运行这一点来说,notify方法和interrupt方法的作用有些类似,但仍有以下不同:
方法 方法来源 唤醒对象 是否需要获取实例的锁 notify/notifyAll java.lang.Object 实例等待队列中的线程 是 interrupt java.lang.Thread 指定线程 否
5.6.5 join方法和interrupt方法
- 当线程使用join方法等待其他线程终止时,也可以使用interrupt方法进行取消。由于调用join方法时无需获取锁,所以与使用sleep暂停运行时一样,线程的控制权也会立即跳到catch语句块中。
5.6.6 interrupt 方法只是改变中断状态
- 有人也许会认为“当调用interrupt方法时,调用对象的线程就会抛出InterruptedException异常”,其实这是一种误解。实际上,interrupt方法只是改变了线程的中断状态而已。
- 所谓中断状态(interrupted status),是一种用于表示线程是否被中断的状态。
- 假设当线程Alice执行了sleep、wait、join而停止运行时,线程Bobby调用了Alice的interrupt 方法。这时,线程Alice的确会抛出InterruptedException异常。但这其实是因为sleep、wait、join方法内部对线程的中断状态进行了检查,进而抛出了InterruptedException异常。
- 假设线程Alice执行了1+2之类的计算或a=123之类的赋值操作。这时,即使Bobby 调用Alice的interrupt 方法,Alice也不会抛出InterruptedException异常,而是继续执行处理。不仅仅是计算和赋值,for语句、while语句、if语句及方法调用都不会检查中断状态。
- 如果没有调用sleep、wait、join等方法,或者没有编写检查线程的中断状态并抛出InterruptedException 异常的代码,那么InterruptedException异常就不会被抛出。
5.6.7 isInterrupt方法和Thread.interrupted方法
- isInterrupted是Thread类的实例方法,用于检查指定线程的中断状态。该方法不会改变中断状态。
- Thread.interrupted是Thread类的静态方法,用于检查并清除当前线程的中断状态。只有这个方法才可以清除中断状态。Thread.interrupted的操作对象是当前线程(即线程本身),所以该方法并不能用于清除其他线程的中断状态。
5.7 java.util.concurrent包和Producer-Consumer模式
5.7.1 java.util.concurrent包中的队列
-
BlockingQueue 接口——阻塞队列
- BlockingQueue接口表示的是在达到合适的状态之前线程一直阻塞(wait)的队列。
- BlockingQueue是java.util.Queue接口的子接口,拥有offer方法和poll方法等。但实际上,实现 “阻塞” 功能的方法是BlockingQueue接口固有的put方法和take方法。
- 由于BlockingQueue是一个接口,所以在实际使用时,需要使用BlockingQueue的实现类。下面列举的就是BlockingQueue的实现类。
-
ArrayBlockingQueue 类——基于数组的BlockingQueue
- ArrayBlockingQueue类表示的是元素个数有最大限制的BlockingQueue。当数组满了但仍要put数据时,或者数组为空但仍要take数据时,线程就会阻塞。
-
LinkedBlockingQueue 类——基于链表的BlockingQueue
- LinkedBlockingQueue 类表示的是元素个数没有最大限制的BlockingQueue。该类基于链表,如果没有特别指定,元素个数将没有最大限制。只要还有内存,就可以put数据。
-
PriorityBlockingQueue 类——带有优先级的BlockingQueue
- PriorityBlockingQueue类表示的是带有优先级的BlockingQueue。数据的 “优先级”是依据Comparable接口的自然排序,或者构造函数的Comparator接口决定的顺序指定的。
-
DelayQueue类—在一定时间之后才可以 take 的BlockingQueue
- DelayQueue类表示的是用于储存 java.util.concurrent.Delayed对象的队列。当从该队列take时,只有在各元素指定的时间到期后才可以take。另外,到期时间最长的元素将先被take。
-
SynchronousQueue 类——直接传递的BlockingQueue
- SynchronousQueue类表示的是BlockingQueue,该BlockingQueue用于执行由Producer角色到Consumer角色的 “直接传递”。如果Producer角色先put,在Consumer角色take之前,Producer角色的线程将一直阻塞。相反,如果Consumer角色先take,在Producer角色put之前,Consumer角色的线程将一直阻塞。
-
ConcurrentLinkedQueue类——元素个数没有最大限制的线程安全队
- ConcurrentLinkedQueue类并不是BlockingQueue的实现类,它表示的是元素个数没有最大限制的线程安全队列。在ConcurrentLinkedQueue中,内部的数据结构是分开的,线程之间互不影响,所以也就无需执行互斥处理。根据线程情况的不同,有时程序的性能也会有所提高。