1、知识点回顾
- volatile
- CAS
- atomicInteger
- BlockingQueue
- 线程交互
- 原子引用
2、生产者消费者模式(阻塞队列版本)
package com.company;
import org.omg.CORBA.TIMEOUT;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
class MyResource
{
//volatile 保证可见性,并禁止指令重排
private volatile Boolean FLAG = true;//默认开启:进行生产消费
//多线程环境下不能使用 i++
private AtomicInteger atomicInteger = new AtomicInteger();//默认值为 0 ,用来表示库存
//使用阻塞队列”接口“,用以满足所以阻塞队列的需要
BlockingQueue<String> blockingQueue = null;
MyResource(BlockingQueue<String> blockingQueue){
//创建时,指定阻塞队列的种类-----------写,要足够的抽象,往高处写
this.blockingQueue = blockingQueue;
//要保留世界使用的类的信息-------------查,要足够的落地,往细节查
System.out.println(blockingQueue.getClass().getName());
}
public void myProd() throws InterruptedException {
//定义变量放在循环外,尽量复用变量引用
String data = null;
Boolean reValue;
while(FLAG){
data = atomicInteger.incrementAndGet()+"";//++i
reValue = blockingQueue.offer(data,2, TimeUnit.SECONDS);
if (reValue){
System.out.println(Thread.currentThread().getName()+"\t插入队列"+data+"成功");
}else{
System.out.println(Thread.currentThread().getName()+"\t插入队列"+data+"失败");
}
TimeUnit.SECONDS.sleep(1);
}
System.out.println(Thread.currentThread().getName()+"\t生产被叫停了");
}
public void myConsumer() throws InterruptedException{
String result = null ;
while(FLAG){
result = blockingQueue.poll(2,TimeUnit.SECONDS);
if(null == result || result.equalsIgnoreCase("")){
//FLAG = false;
System.out.println(Thread.currentThread().getName()+"\t超过2秒钟没有取到货物,消费退出");
return;
}
System.out.println(Thread.currentThread().getName()+"\t消费队列"+result+"成功");
}
}
public void stop(){
this.FLAG = false;
}
}
/**
*
* volatile / CAS / atomicInteger / 原子引用 / 线程交互 / BlockingQueue
*/
public class ProdConsumer_BlockquoteDemo {
public static void main(String[] args) throws Exception{
//BlockingQueue<String> blockingQueue = new SynchronousQueue<>();
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(5);
MyResource myResource = new MyResource(blockingQueue);
new Thread(()->{
System.out.println(Thread.currentThread().getName()+"\t生成线程启动");
try {
myResource.myProd();
} catch (InterruptedException e) {
e.printStackTrace();
}
},"Prod").start();
new Thread(()->{
System.out.println(Thread.currentThread().getName()+"\t消费线程启动");
try {
myResource.myConsumer();
} catch (InterruptedException e) {
e.printStackTrace();
}
},"Consume").start();
TimeUnit.SECONDS.sleep(5);
myResource.stop();
System.out.println(Thread.currentThread().getName()+"\t叫停活动");
}
}
Console:
java.util.concurrent.ArrayBlockingQueue
Prod 生成线程启动
Consume 消费线程启动
Prod 插入队列1成功
Consume 消费队列1成功
Prod 插入队列2成功
Consume 消费队列2成功
Prod 插入队列3成功
Consume 消费队列3成功
Prod 插入队列4成功
Consume 消费队列4成功
Prod 插入队列5成功
Consume 消费队列5成功
main 叫停活动
Prod 生产被叫停了
Consume 超过2秒钟没有取到货物,消费退出