综合实例2--生产者消费者模式(阻塞队列版本)

1、知识点回顾

  • volatile
  • CAS
  • atomicInteger
  • BlockingQueue
  • 线程交互
  • 原子引用

2、生产者消费者模式(阻塞队列版本)

生产者消费者模式(阻塞队列版本)
package com.company;

import org.omg.CORBA.TIMEOUT;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

class MyResource
{
    //volatile 保证可见性,并禁止指令重排
    private volatile Boolean FLAG = true;//默认开启:进行生产消费
    //多线程环境下不能使用 i++
    private AtomicInteger atomicInteger = new AtomicInteger();//默认值为 0 ,用来表示库存

    //使用阻塞队列”接口“,用以满足所以阻塞队列的需要
    BlockingQueue<String> blockingQueue = null;

    MyResource(BlockingQueue<String> blockingQueue){
        //创建时,指定阻塞队列的种类-----------写,要足够的抽象,往高处写
        this.blockingQueue = blockingQueue;
        //要保留世界使用的类的信息-------------查,要足够的落地,往细节查
        System.out.println(blockingQueue.getClass().getName());
    }

    public void myProd() throws InterruptedException {
        //定义变量放在循环外,尽量复用变量引用
        String data = null;
        Boolean reValue;
        while(FLAG){
            data = atomicInteger.incrementAndGet()+"";//++i
            reValue = blockingQueue.offer(data,2, TimeUnit.SECONDS);
            if (reValue){
                System.out.println(Thread.currentThread().getName()+"\t插入队列"+data+"成功");
            }else{
                System.out.println(Thread.currentThread().getName()+"\t插入队列"+data+"失败");
            }
            TimeUnit.SECONDS.sleep(1);
        }
        System.out.println(Thread.currentThread().getName()+"\t生产被叫停了");
    }

    public void myConsumer() throws InterruptedException{
        String result = null ;
        while(FLAG){
            result = blockingQueue.poll(2,TimeUnit.SECONDS);
            if(null == result || result.equalsIgnoreCase("")){
                //FLAG = false;
                System.out.println(Thread.currentThread().getName()+"\t超过2秒钟没有取到货物,消费退出");
                return;
            }
            System.out.println(Thread.currentThread().getName()+"\t消费队列"+result+"成功");
        }
    }

    public void stop(){
        this.FLAG = false;
    }
}
/**
 *
 * volatile / CAS / atomicInteger / 原子引用 / 线程交互 / BlockingQueue
 */
public class ProdConsumer_BlockquoteDemo {
    public static void main(String[] args) throws Exception{
        //BlockingQueue<String> blockingQueue = new SynchronousQueue<>();
        BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(5);
        MyResource myResource = new MyResource(blockingQueue);

        new Thread(()->{
            System.out.println(Thread.currentThread().getName()+"\t生成线程启动");
            try {
                myResource.myProd();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"Prod").start();

        new Thread(()->{
            System.out.println(Thread.currentThread().getName()+"\t消费线程启动");
            try {
                myResource.myConsumer();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        },"Consume").start();

        TimeUnit.SECONDS.sleep(5);

        myResource.stop();
        System.out.println(Thread.currentThread().getName()+"\t叫停活动");
    }
}

Console:

java.util.concurrent.ArrayBlockingQueue
Prod    生成线程启动
Consume 消费线程启动
Prod    插入队列1成功
Consume 消费队列1成功
Prod    插入队列2成功
Consume 消费队列2成功
Prod    插入队列3成功
Consume 消费队列3成功
Prod    插入队列4成功
Consume 消费队列4成功
Prod    插入队列5成功
Consume 消费队列5成功
main    叫停活动
Prod    生产被叫停了
Consume 超过2秒钟没有取到货物,消费退出

3、好处是,不需要关心具体什么时间阻塞,什么时间唤醒线程,统统交给BlockingQueue,按照规矩办事。

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容