场景
生产者生产数据,消费者消费数据;
但是性能处理速度均有差异,因而需要一个中间队列协调;
举例
3个厨师做甜点,有3个吃货来吃,如果厨师和吃货一对一,厨师的生产速度和吃货的吃饭速度均成成了短板,可能存在这样一种不协调的场景;
- 慢性子厨师款待一个吃饭快的胖吃货,吃货都要饿死了,只能死等;
- 急性子厨师款待一个吃饭慢的瘦子吃货(比如我女朋友),吃货吃不完,堆了一堆甜点
这个时候怎么办比较好呢?
加一个橱柜,糕点生产好一个一个放在橱柜里,吃货排队一个一个拿,这样是不是好多了呢,橱柜协调了厨师和吃货的关系,减少了浪费;
角色
生产者 厨师
消费者 吃货
数据 糕点
中间队列 桌子
提升与思考
代码放在最后,先讲几个问题深入大家理解
- 队列可以有多种实现(Strategy Pattern)
- 先放先出 FIFO
- 后放先出 LIFO
- 优先级队列 Priority Quene
-
为什么生产和消费要用多线程?
这也是我在实际应用中感受最深的,答案只有一个,单线程太耗时
只有当多线程的效率提升可以抵消开发难度和性能消耗时才有必要用多线程 -
别忘记sychronized 和 notifyAll()
否则其他线程一直等待,不会继续 -
生产线程和消费线程的数量匹配
上述问题可以只有一个生产线程和一个消费线程(PIPE模式)
当生产比较费时的时候生产线程多一些,消费耗时时(如网络IO)多一些消费线程(一对多对应线程池的WorkThread设计模式)
-
当厨师效率较低时(Thread.sleep(1000))
面包总是被吃光,可以增加厨师或者减少吃货
面包被吃货吃光,等待厨师生产
面包被吃货吃光,等待厨师生产
面包被吃货吃光,等待厨师生产
厨师乙 在桌子上放入了 厨师乙产生的第1个面包
吃货甲 从桌子上取走了 厨师乙产生的第1个面包
面包被吃货吃光,等待厨师生产
面包被吃货吃光,等待厨师生产
厨师甲 在桌子上放入了 厨师甲产生的第2个面包
吃货丙 从桌子上取走了 厨师甲产生的第2个面包
面包被吃货吃光,等待厨师生产
厨师丙 在桌子上放入了 厨师丙产生的第3个面包
吃货乙 从桌子上取走了 厨师丙产生的第3个面包
面包被吃货吃光,等待厨师生产
厨师乙 在桌子上放入了 厨师乙产生的第4个面包
吃货甲 从桌子上取走了 厨师乙产生的第4个面包
面包被吃货吃光,等待厨师生产
厨师丙 在桌子上放入了 厨师丙产生的第5个面包
吃货丙 从桌子上取走了 厨师丙产生的第5个面包
面包被吃货吃光,等待厨师生产
面包被吃货吃光,等待厨师生产
-
当厨师效率较高时(Thread.sleep(500)时)
总是放不下,这时可以减少厨师或者增加吃货
厨师丙 在桌子上放入了 厨师丙产生的第18个面包
桌子放满了面包,不能再放了
桌子放满了面包,不能再放了
吃货丙 从桌子上取走了 厨师甲产生的第11个面包
厨师乙 在桌子上放入了 厨师乙产生的第7个面包
桌子放满了面包,不能再放了
桌子放满了面包,不能再放了
吃货乙 从桌子上取走了 厨师丙产生的第16个面包
厨师丙 在桌子上放入了 厨师丙产生的第19个面包
桌子放满了面包,不能再放了
吃货丙 从桌子上取走了 厨师丙产生的第18个面包
厨师甲 在桌子上放入了 厨师甲产生的第17个面包
桌子放满了面包,不能再放了
桌子放满了面包,不能再放了
桌子放满了面包,不能再放了
吃货乙 从桌子上取走了 厨师乙产生的第7个面包
厨师甲 在桌子上放入了 厨师甲产生的第22个面包
桌子放满了面包,不能再放了
桌子放满了面包,不能再放了
所以中间队列给我们提供了一个缓冲区,用于协调生产者和消费者间的性能差异,控制互斥
总结:
- 线程的协调运行需要"放在中间的东西"
- 线程的互斥处理需要"保护中间的东西"
- 中间队列使得两端协调成为肯能
代码
- 生产者
/**
* @author xuhe
* @description 生产者线程
* @date 2018/5/17
*/
public class ProducerThread extends Thread {
public static int index;
private String threadName;
private DataChannel dataChannel;
private Random random;
public ProducerThread(String threadName, DataChannel dataChannel, long randomSeed) {
this.threadName = threadName;
this.dataChannel = dataChannel;
this.random = new Random(randomSeed);
setName(threadName);
}
@Override
public void run() {
try {
while (true){
Thread.sleep(random.nextInt(500));
Data data= new Data();
data.setName(String.format("%s产生的第%s个面包",threadName,getIndex()));
dataChannel.put(data);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private synchronized int getIndex(){
return ++index;
}
}
- 消费者
package com.microparticletech.producer_consumer;
import java.util.Random;
/**
* @author xuhe
* @description 消费者线程
* @date 2018/5/17
*/
public class ConsumerThread extends Thread {
private DataChannel dataChannel;
private String threadName;
private Random random;
public ConsumerThread(String threadName,DataChannel dataChannel, long randomSeed) {
this.threadName = threadName;
this.dataChannel=dataChannel;
this.random = new Random(randomSeed);
setName(threadName);
}
@Override
public void run() {
System.out.println(threadName+"启动");
try {
while (true){
Data data=new Data();
data=dataChannel.get();
Thread.sleep(random.nextInt(1000));
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
- 数据
@lombok.Data
public class Data {
private String name;
public void setName(String name) {
this.name = name;
}
@Override
public String toString() {
return
"name='" + name + '\'';
}
}
- 中间队列
public interface DataChannel {
void put(Data data) throws InterruptedException;
Data get() throws InterruptedException;
}
public class DataChannelFIFOImpl implements DataChannel {
//先进先出的队列实现
private static final int CAPACITY=3;
private Data[] dataArray = new Data[CAPACITY];
private int head;
private int tail;
private int count;
@Override
public synchronized void put(Data data) throws InterruptedException {
while (count>=CAPACITY){
System.out.println("桌子放满了面包,不能再放了");
wait();
}
System.out.println(String.format("%s 在桌子上放入了 %s",Thread.currentThread().getName(),data.getName()));
dataArray[tail]=data;
tail=(tail+1)%CAPACITY;
count++;
notifyAll();
}
@Override
public synchronized Data get() throws InterruptedException {
while (count<=0){
System.out.println("面包被吃货吃光,等待厨师生产");
wait();
}
Data data=dataArray[head];
System.out.println(String.format("%s 从桌子上取走了 %s",Thread.currentThread().getName(),data.getName()));
count--;
head=(head+1)%CAPACITY;
notifyAll();
return data;
}
}
- 启动类
public class MainTest {
public static void main(String[] args) {
int size=3;
DataChannel channel = new DataChannelFIFOImpl();
Thread producer1 = new ProducerThread("厨师甲" , channel, 31415);
Thread producer2 = new ProducerThread("厨师乙" , channel, 92653);
Thread producer3 = new ProducerThread("厨师丙" , channel, 58979);
producer1.start();
producer2.start();
producer3.start();
Thread consumer1 = new ConsumerThread("吃货甲" , channel, 32384);
Thread consumer2 = new ConsumerThread("吃货乙" , channel, 62643);
Thread consumer3 = new ConsumerThread("吃货丙" , channel, 38327);
consumer1.start();
consumer2.start();
consumer3.start();
}
}