话不多说,直接上代码:
- 资源类:
/**
* 资源类
*/
class DataSource{
private volatile boolean FLAG=true;
private AtomicInteger atomicInteger=new AtomicInteger();
BlockingQueue<String> blockingDeque=null;
public DataSource(BlockingQueue blockingDeque){
this.blockingDeque=blockingDeque;
System.out.println(blockingDeque.getClass().getName());
}
public void produce() throws Exception{
String data=null;
boolean retValue;
while (FLAG) {
data=atomicInteger.incrementAndGet()+"";
retValue=blockingDeque.offer(data,2L, TimeUnit.SECONDS);
if (retValue){
System.out.println(Thread.currentThread().getName()+"\t插入队列"+data+"成功");
}else {
System.out.println(Thread.currentThread().getName()+"\t插入队列"+data+"失败");
}
TimeUnit.SECONDS.sleep(1);
}
System.out.println(Thread.currentThread().getName()+"\t叫停:表示FLAG=false,生产动作结束");
}
public void consumer() throws Exception{
String result=null;
while (true){
result=blockingDeque.poll(2L,TimeUnit.SECONDS);
if (result ==null || result.equalsIgnoreCase("")){
System.out.println(Thread.currentThread().getName()+"\t超过2S没有取到,退出");
return;
}
System.out.println(Thread.currentThread().getName()+"\t消费队列"+result+"成功");
}
}
public void stop(){
FLAG=false;
}
- main方法
public class ProducerConsumerBlockQueue {
public static void main(String[] args) {
DataSource dataSource= new DataSource(new SynchronousQueue(true));
new Thread(() ->{
System.out.println(Thread.currentThread().getName()+"\t生产线程启动");
try {
dataSource.produce();
} catch (Exception e) {
e.printStackTrace();
}
},"producer").start();
new Thread(() ->{
System.out.println(Thread.currentThread().getName()+"\t消费线程启动");
try {
dataSource.consumer();
} catch (Exception e) {
e.printStackTrace();
}
},"consumer").start();
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("5s时间到,停止主线程");
dataSource.stop();
}
}
- 结果
java.util.concurrent.SynchronousQueue
producer 生产线程启动
consumer 消费线程启动
consumer 消费队列1成功
producer 插入队列1成功
producer 插入队列2成功
consumer 消费队列2成功
producer 插入队列3成功
consumer 消费队列3成功
consumer 消费队列4成功
producer 插入队列4成功
producer 插入队列5成功
consumer 消费队列5成功
5s时间到,停止主线程
producer 叫停:表示FLAG=false,生产动作结束
consumer 超过2S没有取到,退出
- 注意:
传入的参数可以是
DataSource dataSource= new DataSource(new ArrayBlockingQueue(10));