JUC--阻塞队列

阻塞队列:
阻塞队列底层是由ReentrantLock和condition实现的
1.JUC提供的阻塞队列
2.阻塞队列的操作方法
3.使用案例

1.JUC提供的阻塞队列

在 Java8 中提供了 7 个阻塞队列
ArrayBlockingQueue 数组实现的有界阻塞队列, 此队列按照先进先出(FIFO)的原则对元素进行排序。
LinkedBlockingQueue 链表实现的有界阻塞队列, 此队列的默认和最大长度为Integer.MAX_VALUE。此队列按照先进先出的原则对元素进行
排序
PriorityBlockingQueue 支持优先级排序的无界阻塞队列, 默认情况下元素采取自然顺序升序排列。也可以自定义类实现 compareTo()方法来指定元素排序规则,或者初始化 PriorityBlockingQueue 时,指定构造参数Comparator 来对元素进行排序。
DelayQueue 优先级队列实现的无界阻塞队列
SynchronousQueue 不存储元素的阻塞队列, 每一个 put 操作必须等待一个 take 操作,否则不能继续添加元素。
LinkedTransferQueue 链表实现的无界阻塞队列
LinkedBlockingDeque 链表实现的双向阻塞队列

2.阻塞队列的操作方法

在阻塞队列中,提供了四种处理方式

  1. 插入操作
    add(e) :添加元素到队列中,如果队列满了,继续插入
    元素会报错,IllegalStateException。
    offer(e) : 添加元素到队列,同时会返回元素是否插入
    成功的状态,如果成功则返回 true
    put(e) :当阻塞队列满了以后,生产者继续通过 put
    添加元素,队列会一直阻塞生产者线程,知道队列可用
    offer(e,time,unit) :当阻塞队列满了以后继续添加元素,
    生产者线程会被阻塞指定时间,如果超时,则线程直接
    退出
  2. 移除操作
    remove():如果元素移除成功,则返回 true,如果移除失败(队列为空),则会抛出异常。
    poll(): 当队列中存在元素,则从队列中取出一个元素,
    如果队列为空,则直接返回 null
    take():基于阻塞的方式获取队列中的元素,如果队列为
    空,则 take 方法会一直阻塞,直到队列中有新的数据可
    以消费
    poll(time,unit):带超时机制的获取数据,如果队列为空,
    则会等待指定的时间再去获取元素返回
3.使用案例
注册成功后增加积分

假如我们模拟一个场景,就是用户注册的时候,在注册成
功以后发放积分。这个场景在一般来说,我们会这么去实


image.png

但是实际上,我们需要考虑两个问题

  1. 性能,在注册这个环节里面,假如添加用户需要花费 1 秒
    钟,增加积分需要花费 1 秒钟,那么整个注册结果的返
    回就可能需要大于 2 秒,虽然影响不是很大,但是在量
    比较大的时候,我们也需要做一些优化
  2. 耦合,添加用户和增加积分,可以认为是两个领域,也
    就是说,增加积分并不是注册必须要具备的功能,但是
    一旦增加积分这个逻辑出现异常,就会导致注册失败。
    这种耦合在程序设计的时候是一定要规避的
    因此我们可以通过异步的方式来实现
    改造之前的代码逻辑
public class UserService {
 public boolean register(){
 User user=new User();
 user.setName("Mic");
 addUser(user);
 sendPoints(user);
 return true;
 }
 public static void main(String[] args) {
 new UserService().register();
 }
 private void addUser(User user){
 System.out.println("添加用户:"+user);
 try {
 Thread.sleep(1000);
 } catch (InterruptedException e) {
 e.printStackTrace();
 }
 }
 private void sendPoints(User user){
 System.out.println(" 发 送 积 分 给 指 定 用
户:"+user);
 try {
 Thread.sleep(1000);
 } catch (InterruptedException e) {
 e.printStackTrace();
 }
 }
}

改造之后的逻辑

public class UserService {
 private final ExecutorService single = 
Executors.newSingleThreadExecutor();
 private volatile boolean isRunning = true;
 ArrayBlockingQueue arrayBlockingQueue=new 
ArrayBlockingQueue(10);
 {
 init();
 }
 public void init(){
 single.execute(()->{
 while(isRunning){
 try {
 User 
user=(User)arrayBlockingQueue.take();//阻塞的方
式获取队列中的数据
 sendPoints(user);
 } catch (InterruptedException e) 
{
 e.printStackTrace();
 }
 }
 });
 }
 public boolean register(){
 User user=new User();
 user.setName("Mic");
 addUser(user);
 arrayBlockingQueue.add(user);//添加到异步
队列
 return true;
 }
public static void main(String[] args) {
 new UserService().register();
 }
 private void addUser(User user){
 System.out.println("添加用户:"+user);
 try {
 Thread.sleep(1000);
 } catch (InterruptedException e) {
 e.printStackTrace();
 }
 }
 private void sendPoints(User user){
 System.out.println(" 发 送 积 分 给 指 定 用
户:"+user);
 try {
 Thread.sleep(1000);
 } catch (InterruptedException e) {
 e.printStackTrace();
 }
 }
}

优化以后,整个流程就变成了这样


image.png

例2:
下面是使用阻塞队列实现的生产者-消费者模式:

public class Test {
    private int queueSize = 10;
    private ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(queueSize);
     
    public static void main(String[] args)  {
        Test test = new Test();
        Producer producer = test.new Producer();
        Consumer consumer = test.new Consumer();
         
        producer.start();
        consumer.start();
    }
     
    class Consumer extends Thread{
         
        @Override
        public void run() {
            consume();
        }
         
        private void consume() {
            while(true){
                try {
                    queue.take();
                    System.out.println("从队列取走一个元素,队列剩余"+queue.size()+"个元素");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
     
    class Producer extends Thread{
         
        @Override
        public void run() {
            produce();
        }
         
        private void produce() {
            while(true){
                try {
                    queue.put(1);
                    System.out.println("向队列取中插入一个元素,队列剩余空间:"+(queueSize-queue.size()));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容