在设计模式的专题中,我们着重介绍了
GOF 23种设计模式
,但是除了这23种设计模式,还有其它在我们日常编程种广泛使用的设计模式或者说套路
。本文我们就来探讨下生产者消费者模式
。本文主要包括以下部分:
- 前言
- 生产者/消费者模式
2.1 定义
2.2 四种实现
2.2.1 等待/通知模式
2.2.2 Condition
2.2.3 BlockingQueue
2.2.4 管道输入/输出流- 总结
1. 前言
生产者消费者模型,作为理解线程间通讯的经典范例。理解并掌握它,不仅仅有助于我们在面试中PK面试官的无情组合拳,更有利于我们理解线程间通讯的原理,对于并发编程而言也是大有益处的。
2. 生产者/消费者模式
2.1 定义
那么什么是生产者消费者模式呢?
生产者消费者模式是多线程下线程间的一种协作模式。
一部分线程负责生产数据即生产者,一部分线程负责消费数据即消费者。
生产者消费者之间并不直接关联,而是通过共享的缓冲区进行通信。
生产者往缓冲区里输入数据,当缓冲区满时,生产者阻塞。
-
消费者从缓冲区里拉取数据,当缓冲区为空时,消费者阻塞。
可以看到,生产者和消费者之间并不是直接依赖,而是通过第三方组件即缓冲区保持关联。那么这么做有什么好处呢?
-
解耦
。我们常说的程序应该保持高内聚低耦合,这里就是低耦合的体现。生产者和消费者并不直接依赖。换句话说如果生产者或消费者的一方发生变化,只要不涉及到缓冲区,则不会影响另一方。 -
提升并发性能
。试想如果我们不使用生产者/消费者模式,而是生产者生产完直接调用消费者的方法通知消费者,那么在调用消费者的方法返回前,线程将会一直阻塞,这种BIO的编程模式,将会大大降低系统吞吐量。回想一下,作为生产者消费者模式的中间件MQ,是不是被用来进行流量削峰和异步化。
2.2 四种实现
以java为例,常见的实现可以分为四类。等待通知、Condition、BlockingQueue、管道输入/输出流。下面我们分别来解释这四种方式。
2.2.1 等待/通知模式
等待/通知模式作为最基础的实现,一般遵循如下标准范式。
等待方
获取对象的锁。
-
如果条件不满足,则调用wait()方法。被唤醒后检查条件是否满足
。 -
条件满足则执行对应的业务逻辑
。
通知方
获取对象锁。
改变条件。
通知所有在此条件上等待的线程。
实现范例
public class ProducerAndConsumerByWaitNotify {
private static final List<Pen> bucket = new ArrayList<>();
private static final int MAX_NUM = 100;
private static class Producer implements Runnable {
//volatile 修饰的变量,用来保证可见性
//线程停止的标识
private volatile boolean running = true;
@Override
public void run() {
//running==true 且 线程未被中断的情况下,循环生产
while (running && !Thread.currentThread().isInterrupted()) {
synchronized (bucket) {
while (bucket.size() == MAX_NUM) {
try {
//篮子满生产者等待
bucket.wait();
} catch (InterruptedException e) {
e.printStackTrace();
return;
}
}
Pen temp = new Pen(UUID.randomUUID().toString());
bucket.add(temp);
System.out.println("生产者[" + Thread.currentThread().getName() + "]生产了[" + temp + "]" + ",目前容量为:" + bucket.size());
//生产后唤醒
bucket.notifyAll();
}
}
}
//通过标识位停止线程
void cancel() {
this.running = false;
}
}
private static class Consumer implements Runnable {
@Override
public void run() {
while (true) {
synchronized (bucket) {
while (bucket.size() == 0) {
try {
System.out.println("无可用容量,[" + Thread.currentThread().getName() + "] 等待...");
//篮子为空,消费者等待
bucket.wait();
} catch (InterruptedException e) {
e.printStackTrace();
return;
}
}
Pen temp = bucket.remove(0);
System.out.println("消费者[" + Thread.currentThread().getName() + "]消费了[" + temp + "]" + ",目前容量为:" + bucket.size());
//唤醒生产者生产
bucket.notifyAll();
}
}
}
}
public static void main(String[] args) {
List<Producer> producerList = new ArrayList<>();
for (int i = 0; i < 5; i++) {
Producer producer = new Producer();
new Thread(producer, "Producer-" + i).start();
producerList.add(producer);
}
for (int i = 0; i < 10; i++) {
new Thread(new Consumer(), "Consumer-" + i).start();
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
//1秒后停止消费者
producerList.forEach(Producer::cancel);
}
public static class Pen {
private String name;
Pen(String name) {
this.name = name;
}
@Override
public String toString() {
return "Pen{" +
"name='" + name + '\'' +
'}';
}
}
}
输出
生产者[Producer-0]生产了[Pen{name='c5b0cb96-752f-433a-9513-a5dad47f1177'}],目前容量为:1
消费者[Consumer-8]消费了[Pen{name='c5b0cb96-752f-433a-9513-a5dad47f1177'}],目前容量为:0
无可用容量,[Consumer-8] 等待...
无可用容量,[Consumer-9] 等待...
无可用容量,[Consumer-7] 等待...
无可用容量,[Consumer-6] 等待...
无可用容量,[Consumer-5] 等待...
无可用容量,[Consumer-3] 等待...
无可用容量,[Consumer-4] 等待...
无可用容量,[Consumer-2] 等待...
无可用容量,[Consumer-0] 等待...
无可用容量,[Consumer-1] 等待...
生产者[Producer-4]生产了[Pen{name='726cf270-93c0-463b-b556-5e0acb22b7ad'}],目前容量为:1
生产者[Producer-1]生产了[Pen{name='888b0775-12f7-48e7-870e-1968485df945'}],目前容量为:2
生产者[Producer-3]生产了[Pen{name='60b6f9c6-013e-4a22-8efe-f6da16949652'}],目前容量为:3
生产者[Producer-2]生产了[Pen{name='dd09da2c-486d-4b43-8551-0434a63db855'}],目前容量为:4
消费者[Consumer-1]消费了[Pen{name='726cf270-93c0-463b-b556-5e0acb22b7ad'}],目前容量为:3
消费者[Consumer-1]消费了[Pen{name='888b0775-12f7-48e7-870e-1968485df945'}],目前容量为:2
消费者[Consumer-1]消费了[Pen{name='60b6f9c6-013e-4a22-8efe-f6da16949652'}],目前容量为:1
消费者[Consumer-1]消费了[Pen{name='dd09da2c-486d-4b43-8551-0434a63db855'}],目前容量为:0
无可用容量,[Consumer-1] 等待...
无可用容量,[Consumer-0] 等待...
无可用容量,[Consumer-2] 等待...
无可用容量,[Consumer-4] 等待...
无可用容量,[Consumer-3] 等待...
无可用容量,[Consumer-5] 等待...
无可用容量,[Consumer-6] 等待...
无可用容量,[Consumer-7] 等待...
无可用容量,[Consumer-9] 等待...
无可用容量,[Consumer-8] 等待...
问:为什么wait()/notify()/notifyAll() 方法必须在synchronzied关键字内使用?
不使用会怎么样:IllegalMonitorStateException && Lost-Wake-Up
原因:wait()/notfy()/notifyAll()本质上都是monitor对象的方法,而Java中获得对象monitor的方法就是使用synchronized
关于Monitor的内容,我们后面再详细探讨。这里要值得注意的细节是:
调用wait()方法会释放锁并进入Mointor的WaitSet队列,直到被唤醒。被唤醒后需要再次竞争锁,线程从wait状态阻塞返回就说明成功获得锁,但重新获得锁这个过程是JVM保证的,不需要我们去参与。
调用notify()/notifyAll()则不会释放锁。所以一般我们把notify()/notifyAll()放在synchronized代码块的最后一行,执行完毕自然会释放锁。
另外,等待/通知模式还演变出另一种模式:超时等待/通知模式。即在等待/通知模式的基础上加了等待时间
long remain=System.currentTimeMillons()+T;//总共需要等待的时长
while(!condition&&remain>0){
wait(remain);//重新等待
remain=remain-System.currentTimeMillons();//剩余需要等待的时长
}
return result;
2.2.2 Condition
jdk 1.5 之前我们使用锁,只能使用synchronized,配合基于Monitor的wait()/notify()可以实现等待/通知模式
。jdk 1.5之后juc包下提供了替代synchronized关键字的基于CAS和双向链表自旋的Lock
。与之相对替换wait()/notify()的则是Condition
。
Condition相比Monitor可以提供更丰富的功能,详细对比如下:
对比项 | Object Monitor Methods | Condition |
---|---|---|
前置条件 | 获取对象锁 | 调用Lock.lock()获取锁,调用Lock.newCondition()获取Condition() |
调用方法 | object.wait(),object.notify() | condition().await(),condition().signAll() |
等待队列个数 | 一个 | 多个 |
当前线程释放锁并进入等待状态 | 支持 | 支持 |
当前线程释放锁并进入等待状态,在等待状态不响应中断 | 不支持 | 支持 |
当前线程释放锁进入超时等待状态 | 支持 | 支持 |
当前线程释放锁并进入等待状态到将来的某个时间 | 不支持 | 支持 |
唤醒等待队列的一个线程 | 支持 | 支持 |
唤醒等待队列的全部线程 | 支持 | 支持 |
实现
public class ProducerAndConsumerByCondition {
private static final List<Pen> bucket = new ArrayList<>();
private static final int MAX_NUM = 100;
//锁
private static Lock lock = new ReentrantLock();
//生产者条件
private static Condition producerCondition = lock.newCondition();
//消费者条件
private static Condition consumerCondition = lock.newCondition();
private static class Producer implements Runnable {
//volatile 修饰的变量,用来保证可见性
//线程停止的标识
private volatile boolean running = true;
@Override
public void run() {
//running==true 且 线程未被中断的情况下,循环生产
while (running && !Thread.currentThread().isInterrupted()) {
lock.lock();
try {
while (bucket.size() == MAX_NUM) {
try {
//篮子满生产者等待
producerCondition.await();
} catch (InterruptedException e) {
e.printStackTrace();
return;
}
}
Pen temp = new Pen(UUID.randomUUID().toString());
bucket.add(temp);
System.out.println("生产者[" + Thread.currentThread().getName() + "]生产了[" + temp + "]" + ",目前容量为:" + bucket.size());
//生产后唤醒
consumerCondition.signalAll();
} finally {
lock.unlock();
}
}
}
//通过标识位停止线程
void cancel() {
this.running = false;
}
}
private static class Consumer implements Runnable {
@Override
public void run() {
while (true) {
lock.lock();
try {
while (bucket.size() == 0) {
try {
System.out.println("无可用容量,[" + Thread.currentThread().getName() + "] 等待...");
//篮子为空,消费者等待
consumerCondition.await();
} catch (InterruptedException e) {
e.printStackTrace();
return;
}
}
Pen temp = bucket.remove(0);
System.out.println("消费者[" + Thread.currentThread().getName() + "]消费了[" + temp + "]" + ",目前容量为:" + bucket.size());
//唤醒生产者生产
producerCondition.signalAll();
} finally {
lock.unlock();
}
}
}
}
public static void main(String[] args) {
List<Producer> producerList = new ArrayList<>();
for (int i = 0; i < 5; i++) {
Producer producer = new Producer();
new Thread(producer, "Producer-" + i).start();
producerList.add(producer);
}
for (int i = 0; i < 10; i++) {
new Thread(new Consumer(), "Consumer-" + i).start();
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
//1秒后停止消费者
producerList.forEach(Producer::cancel);
}
public static class Pen {
private String name;
Pen(String name) {
this.name = name;
}
@Override
public String toString() {
return "Pen{" +
"name='" + name + '\'' +
'}';
}
}
}
输出
消费者[Consumer-1]消费了[Pen{name='26cfd54d-950d-4676-acf4-456b6033273c'}],目前容量为:29
消费者[Consumer-1]消费了[Pen{name='8b98bf37-09c3-466f-8f9f-f4e5293b1bb3'}],目前容量为:28
消费者[Consumer-1]消费了[Pen{name='1c1695f4-7e5a-44ad-b930-cd30d9ff5b28'}],目前容量为:27
消费者[Consumer-1]消费了[Pen{name='6434e1ed-b95a-4c79-83b1-e31e02cd2589'}],目前容量为:26
消费者[Consumer-1]消费了[Pen{name='65e7f0c4-4353-40d9-977b-727b275f72f4'}],目前容量为:25
消费者[Consumer-1]消费了[Pen{name='26676bfe-d83b-47aa-a20a-483e4fcfb017'}],目前容量为:24
消费者[Consumer-1]消费了[Pen{name='40e17b55-efb1-4afb-879d-ef9bc8f20736'}],目前容量为:23
消费者[Consumer-1]消费了[Pen{name='f9b9a732-9f85-4150-a896-e1297c746a9d'}],目前容量为:22
消费者[Consumer-1]消费了[Pen{name='8de16aeb-2672-4956-9645-9033055b85c4'}],目前容量为:21
消费者[Consumer-1]消费了[Pen{name='4ad12489-f126-4874-a29c-1b90799c491e'}],目前容量为:20
消费者[Consumer-1]消费了[Pen{name='a45edd33-13de-466b-9c45-1799d7f89b91'}],目前容量为:19
消费者[Consumer-1]消费了[Pen{name='d755a385-53b2-4add-a44c-be98bf9bfbe2'}],目前容量为:18
消费者[Consumer-1]消费了[Pen{name='7e8d526b-63ea-4304-8873-c86fd5dba26e'}],目前容量为:17
消费者[Consumer-1]消费了[Pen{name='b575fac1-2a1c-4070-b487-e438f0d68d03'}],目前容量为:16
消费者[Consumer-1]消费了[Pen{name='92a8cf83-9022-4217-9b10-64d8e7c59c8f'}],目前容量为:15
消费者[Consumer-1]消费了[Pen{name='55a9f609-9164-43e6-a4e9-4120c04b6b84'}],目前容量为:14
消费者[Consumer-1]消费了[Pen{name='4728f646-b9d4-4aca-a85e-8ba2014d6ece'}],目前容量为:13
消费者[Consumer-1]消费了[Pen{name='d6e59866-fa1f-4b34-bc92-41b80cb291db'}],目前容量为:12
消费者[Consumer-1]消费了[Pen{name='9e367b4e-11e6-46f7-8bca-e4e7ada4d5c5'}],目前容量为:11
消费者[Consumer-1]消费了[Pen{name='38e4ce28-7843-49df-a9b0-cfc448c43ebd'}],目前容量为:10
消费者[Consumer-1]消费了[Pen{name='a4a3fc7b-c8a4-4589-aaf2-e673c4196f8e'}],目前容量为:9
消费者[Consumer-1]消费了[Pen{name='7a98239f-990c-4724-b394-0fc5cf3f1af9'}],目前容量为:8
消费者[Consumer-1]消费了[Pen{name='ec10872b-4386-4e42-b17d-e55b9c493637'}],目前容量为:7
消费者[Consumer-1]消费了[Pen{name='f9492f37-e757-4bf3-b280-c3c09c26419b'}],目前容量为:6
消费者[Consumer-1]消费了[Pen{name='a414fd9d-9995-4db3-9e08-b5948e80d49a'}],目前容量为:5
消费者[Consumer-1]消费了[Pen{name='c257664e-02ff-41e9-8107-1cf296a4ad4e'}],目前容量为:4
消费者[Consumer-1]消费了[Pen{name='4cdf46be-6b55-498e-9967-fb86b6ed13eb'}],目前容量为:3
消费者[Consumer-1]消费了[Pen{name='44ebc86a-fdb2-46ec-9acf-d4b3de7f7621'}],目前容量为:2
消费者[Consumer-1]消费了[Pen{name='571190b0-1df3-42f5-89d8-200667c24279'}],目前容量为:1
消费者[Consumer-1]消费了[Pen{name='c0252e29-d1cc-4b3f-b68a-99c9cd4c2fd3'}],目前容量为:0
无可用容量,[Consumer-1] 等待...
无可用容量,[Consumer-2] 等待...
无可用容量,[Consumer-4] 等待...
无可用容量,[Consumer-3] 等待...
无可用容量,[Consumer-5] 等待...
无可用容量,[Consumer-6] 等待...
无可用容量,[Consumer-7] 等待...
无可用容量,[Consumer-8] 等待...
无可用容量,[Consumer-9] 等待...
生产者[Producer-0]生产了[Pen{name='85531a03-9da5-487d-9576-bf496d68669a'}],目前容量为:1
生产者[Producer-1]生产了[Pen{name='119c13ab-fe34-43dc-a3cf-cf783f560e74'}],目前容量为:2
生产者[Producer-2]生产了[Pen{name='497c5e0a-bdfa-4c21-8e89-63be057cfb35'}],目前容量为:3
生产者[Producer-3]生产了[Pen{name='d3542a55-e279-4085-8689-1a310331dd79'}],目前容量为:4
生产者[Producer-4]生产了[Pen{name='b6f9b29a-966e-40c5-a0d2-10437e1a6287'}],目前容量为:5
消费者[Consumer-0]消费了[Pen{name='85531a03-9da5-487d-9576-bf496d68669a'}],目前容量为:4
消费者[Consumer-0]消费了[Pen{name='119c13ab-fe34-43dc-a3cf-cf783f560e74'}],目前容量为:3
消费者[Consumer-0]消费了[Pen{name='497c5e0a-bdfa-4c21-8e89-63be057cfb35'}],目前容量为:2
消费者[Consumer-0]消费了[Pen{name='d3542a55-e279-4085-8689-1a310331dd79'}],目前容量为:1
消费者[Consumer-0]消费了[Pen{name='b6f9b29a-966e-40c5-a0d2-10437e1a6287'}],目前容量为:0
无可用容量,[Consumer-0] 等待...
无可用容量,[Consumer-1] 等待...
无可用容量,[Consumer-2] 等待...
无可用容量,[Consumer-4] 等待...
无可用容量,[Consumer-3] 等待...
无可用容量,[Consumer-5] 等待...
无可用容量,[Consumer-6] 等待...
无可用容量,[Consumer-7] 等待...
无可用容量,[Consumer-8] 等待...
无可用容量,[Consumer-9] 等待...
可以看到Condition 能实现 Monitor 同样的等待通知模式,且编码逻辑更加清晰易懂。值得注意的是:调用condition同样要先获取Lock。
问:为什么Condition能实现wait()/notify()/notifyAll()同样的功能呢?
condition内部有一个等待队列(复用AbstractQueuedSynchronizer的Node链表)
调用await()的线程,1)会将当前线程构建成Node添加到Condition的等待队列里,2)并释放锁(释放锁的过程中会使用AQS的模板方法,唤醒AbstractQueuedSynchronized中的同步队列),3)使用LockSupport.park(this) 挂起线程
调用signal()方法的时候,会使用LockSupport.unpark(thread) 唤醒等待队列的头节点(即等待时间最长的节点)
2.2.3 BlockingQueue
可以看到,Condition模式下的等待/通知,不过是基于JDK 数据结构层面实现的另一种等待通知模式而已。那么有没有更简洁的实现方式呢?其实贴心的JDK 已经为我们内置了一种实现方式,也是推荐使用的简单易用的生产者/消费者模式-BlockingQueue
阻塞队列的实现是线程安全的队列
put(e)方法会在队列满的时候阻塞,直到队列有空间或者被打断
take()方法会移除队列头部第一个元素,如果队列为空则阻塞,直到队列非空或者被打断
实现
public class ProducerAndConsumerByBlockingQueue {
private static final int MAX_NUM = 100;
private static final BlockingQueue<Pen> bucket = new ArrayBlockingQueue<>(MAX_NUM);
private static class Producer implements Runnable {
//volatile 修饰的变量,用来保证可见性
//线程停止的标识
private volatile boolean running = true;
@Override
public void run() {
//running==true 且 线程未被中断的情况下,循环生产
while (running && !Thread.currentThread().isInterrupted()) {
Pen temp = new Pen(UUID.randomUUID().toString());
try {
bucket.put(temp);//如果队列满这里会阻塞
} catch (InterruptedException e) {
e.printStackTrace();
return;
}
System.out.println("生产者[" + Thread.currentThread().getName() + "]生产了[" + temp + "]" + ",目前容量为:" + bucket.size());
//生产后唤醒
}
System.out.println("生产者[" + Thread.currentThread().getName() + "] 停止运行");
}
//通过标识位停止线程
void cancel() {
this.running = false;
}
}
private static class Consumer implements Runnable {
@Override
public void run() {
while (true) {
try {
System.out.println("消费者[" + Thread.currentThread().getName() + "]准备消费***" );
Pen temp = bucket.take();//如果队列空这里会阻塞
System.out.println("消费者[" + Thread.currentThread().getName() + "]消费了[" + temp + "]" + ",目前容量为:" + bucket.size());
} catch (InterruptedException e) {
e.printStackTrace();
return;
}
}
}
}
public static void main(String[] args) {
List<Producer> producerList = new ArrayList<>();
for (int i = 0; i < 5; i++) {
Producer producer = new Producer();
new Thread(producer, "Producer-" + i).start();
producerList.add(producer);
}
for (int i = 0; i < 10; i++) {
new Thread(new Consumer(), "Consumer-" + i).start();
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
//1秒后停止消费者
producerList.forEach(Producer::cancel);
}
public static class Pen {
private String name;
Pen(String name) {
this.name = name;
}
@Override
public String toString() {
return "Pen{" +
"name='" + name + '\'' +
'}';
}
}
}
输出
生产者[Producer-2]生产了[Pen{name='fdd4f44b-493b-41ce-abe5-f6ab9e3a8116'}],目前容量为:1
生产者[Producer-0]生产了[Pen{name='25b6dcb4-4203-4b66-9a3b-67e54692deab'}],目前容量为:1
生产者[Producer-0] 停止运行
生产者[Producer-2] 停止运行
生产者[Producer-3]生产了[Pen{name='31d74a3f-86dc-4624-9c1e-00dd064ea79f'}],目前容量为:1
生产者[Producer-3] 停止运行
消费者[Consumer-1]消费了[Pen{name='722e8b0d-896e-4bcb-bbfb-fd7385133f2a'}],目前容量为:0
消费者[Consumer-1]准备消费***
生产者[Producer-1]生产了[Pen{name='722e8b0d-896e-4bcb-bbfb-fd7385133f2a'}],目前容量为:1
生产者[Producer-1] 停止运行
消费者[Consumer-2]消费了[Pen{name='a3ec14be-3692-4311-a09f-4fc9ded46610'}],目前容量为:0
消费者[Consumer-2]准备消费***
生产者[Producer-4]生产了[Pen{name='a3ec14be-3692-4311-a09f-4fc9ded46610'}],目前容量为:1
生产者[Producer-4] 停止运行
可以看到相比于前两种方式,我们无需去写任何有关并发安全控制的代码,无需做任何同步就实现了生产者/消费者模式。
问:为什么BlockingQueue能在不做同步的情况下线程安全的实现生产者/消费者模式呢?
答:拿ArrayBlockingQueue来说,其内部其实也使用了 Lock +Condition的模式
。感兴趣的可以自己下去阅读,这里贴出部分源码
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
2.2.4 管道输入/输出流
管道输入/输出流和普通的文件输入/输出流或者网络输入/输出流的不同之处在于,它主要用于线程之间的数据传输,且传输的媒介为内存
。
具体步骤如下
- 创建PipedWriter
- 创建PipedReader
- PipedWriter写入数据
- PipedReader 读取数据
实现方式
public class ProducerAndConsumerByPied {
private static PipedWriter writer = new PipedWriter();
private static PipedReader reader = new PipedReader();
private static class Producer implements Runnable {
@Override
public void run() {
//running==true 且 线程未被中断的情况下,循环生产
Pen temp = new Pen(UUID.randomUUID().toString());
try {
System.out.println("生产者[" + Thread.currentThread().getName() + "]生产了[" + temp + "]");
writer.write(temp.toString());
} catch (IOException e) {
e.printStackTrace();
}
//生产后唤醒
System.out.println("生产者[" + Thread.currentThread().getName() + "] 停止运行");
}
}
private static class Consumer implements Runnable {
@Override
public void run() {
char[] buff = new char[1024];
try {
reader.read(buff);
} catch (IOException e) {
e.printStackTrace();
}
String temp = new String(buff, 0, buff.length);
System.out.println("消费者[" + Thread.currentThread().getName() + "]消费了[" + temp + "]" );
}
}
public static void main(String[] args) {
try {
writer.connect(reader);
} catch (IOException e) {
e.printStackTrace();
}
new Thread(new Producer(), "Producer-").start();
new Thread(new Consumer(), "Consumer-").start();
}
public static class Pen {
private String name;
Pen(String name) {
this.name = name;
}
@Override
public String toString() {
return "Pen{" +
"name='" + name + '\'' +
'}';
}
}
}
输出
生产者[Producer-]生产了[Pen{name='ce510d3e-8a74-416b-8096-9d32ba518d8a'}]
生产者[Producer-] 停止运行
消费者[Consumer-]消费了[Pen{name='ce510d3e-8a74-416b-8096-9d32ba518d8a'} ]
3. 总结
本文简单介绍了四种实现生产者/消费者模型的方式,并大概介绍了几种方式的实现原理。篇幅所限,对于有些技术细节并未展开来说,这部分我们待后续专题展开来讲。
由于技术水平所限,文章难免有不足之处,欢迎大家指出。希望每位读者都能有新的收获,我们下一篇文章并发编程之AQS探秘见....