Java中Queue、BlockingQueue和队列实现生产者消费者模式

1. Queue接口 - 队列

public interface Queue<E> 
    extends Collection<E>
  • Collection的子接口,表示队列FIFO(First In First Out)
    常用方法:
    (1)抛出异常
    boolean add(E e) // 顺序添加1个元素(到达上限后,再添加则会抛出异常)
    E remove() // 获得第1个元素并移除(如果队列没有元素时,则抛异常)
    E element() // 获得第1个元素但不移除(如果队列没有元素时,则抛异常)
    (2)返回特殊值【推荐】
    boolean offer(E e) // 顺序添加1个元素(到达上限后,再添加则会返回false)
    E poll() // 获得第1个元素并移除(如果队列没有元素时,则返回null)
    E keep() // 获得第1个元素但不移除(如果队列没有元素时,则返回null)

1.1 ConcurrentLinkedQueue类(线程安全)

public class ConcurrentLinkedQueue<E> 
    extends AbstractQueue<E> 
    implements Queue<E>, Serializable

说明:

  • 线程安全、可高效读写的队列,高并发下性能最好的队列
  • 无锁、CAS比较交换算法,修改的方法包含3个核心参数(V,E,N);
  • V:要更新的变量、E:预期值、N:新值
  • 只有当V==E时,V=N;否则表示已被更新过,则取消当前操作。

使用示例:

import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
public class TestQueue {
      public static void main(String[] args) {
            // 列表:尾部追加 - add...
            // 链表:头尾添加 - addFirst/addLast
            // 队列:先进先出(FIFO) - offer...
            // >>> 以上三种的对应成员方法,切记不能混用!会打乱已知规则。
            LinkedList<String> link = new LinkedList<String>();
            //Queue<String> link = new LinkedList<String>(); // 强制LinkedList遵循队列的规则
            link.offer("A"); // offer用的是FIFO队列方式
            link.offer("B");
            link.offer("C");
            // 用列表的方式打乱了FIFO队列的规则
            link.add(0, "D");
            System.out.println(link.peek()); // D
            
            // 线程安全的队列Queue
            // 严格遵循队列规则,线程安全,采用CAS交换算法
            Queue<String> q = new ConcurrentLinkedQueue<String>();
            // 1.抛出异常的 2.返回结果的
            q.offer("A");
            q.offer("B");
            q.offer("C");
            
            q.poll(); // 删除表头,表头更新为B
            
            System.out.println(q.peek()); // 获取表头,此时为B
      }
}

2. BlockingQueue接口 - 阻塞队列

public interface BlockingQueue<E> 
    extends Queue<E>

常用方法:
void put(E e) // 将指定元素插入此队列中,如果没有可用空间,则死等
E take() // 获取并移除此队列头部元素,如果没有可用元素,则死等
说明:

  • Queue的子接口,阻塞的队列,增加了两个线程状态为无限期等待的方法
  • 可用于解决生产者、消费者问题

2.1 ArrayBlockingQueue类(有界阻塞队列)

  • 数组结构实现,有界队列。手工固定上限
BlockingQueue<String> abq = new ArrayBlockingQueue<String>(3);

2.2 LinkedBlockingQueue类(无界阻塞队列)

  • 链表结构实现,无界队列。默认上限Integer.MAX_VALUE
BlockingQueue<String> lbq = new LinkedBlockingQueue<String>();

3. 源码:BlockingQueue实现生产者消费者模式

BlockingQueue是JDK5.0的新增内容,它是一个已经在内部实现了同步的队列,实现方式采用的是await()/signal()方法。它可以在生成对象时指定容量大小,用于阻塞操作的是put()和take()方法。

  • put()方法:类似于我们上面的生产者线程,容量达到最大时,自动阻塞
  • take()方法:类似于我们上面的消费者线程,容量为0时,自动阻塞
import java.util.concurrent.LinkedBlockingQueue;
public class TestProduceAndCustomer2 {
    public static void main(String[] args) {
        StorageQ s = new StorageQ();
        Thread p1 = new Thread(new ProducerQ(s), "A厂");
        Thread p2 = new Thread(new ProducerQ(s), "B厂");
        Thread p3 = new Thread(new ProducerQ(s), "C厂");

        Thread c1 = new Thread(new CustomerQ(s), "a人");
        Thread c2 = new Thread(new CustomerQ(s), "b人");
        Thread c3 = new Thread(new CustomerQ(s), "c人");
        p1.start();
        p2.start();
        p3.start();
        c1.start();
        c2.start();
        c3.start();
    }
}

// 仓库 - 共享资源对象
class StorageQ {
    // 仓库存储的载体 - 使用无界阻塞队列,也可指定容量大小。
    private LinkedBlockingQueue<Object> lbq = new LinkedBlockingQueue<>(10);
    public StorageQ() {
        super();
    }
    public StorageQ(LinkedBlockingQueue<Object> lbq) {
        super();
        this.lbq = lbq;
    }
    public LinkedBlockingQueue<Object> getLbq() {
        return lbq;
    }
    public void setLbq(LinkedBlockingQueue<Object> lbq) {
        this.lbq = lbq;
    }

    // 生产
    public void produce() {
        try{
            lbq.put(new Object());
            System.out.println("【生产者" + Thread.currentThread().getName()
                    + "】生产一个产品,现库存" + lbq.size());
        } catch (InterruptedException e){
            e.printStackTrace();
        }
    }

    
    // 消费
    public void custome() {
        try{
            lbq.take();
            System.out.println("【消费者" + Thread.currentThread().getName()
                    + "】消费了一个产品,现库存" + lbq.size());
        } catch (InterruptedException e){
            e.printStackTrace();
        }
    }
}

// 生产者
class ProducerQ implements Runnable {
    private StorageQ s;
    public ProducerQ() {}
    public ProducerQ(StorageQ s) {
        this.s = s;
    }
    public void run() {
        while (true) {
            try {
                Thread.sleep((int) (Math.random() * 2000));
                this.s.produce();  // 没满 + 可锁 = 生产+1
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

// 消费者
class CustomerQ implements Runnable {
    private StorageQ s;
    public CustomerQ() {}
    public CustomerQ(StorageQ s) {
        this.s = s;
    }
    public void run() {
        while (true) {
            try {
                Thread.sleep((int) (Math.random() * 2000));
                this.s.custome(); // 不空 + 可锁 = 消费-1
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

运行结果:


Java队列实现生产者与消费者
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • 阻塞队列提供了可阻塞的put 和take方法, 以及支持定时的offer和poll方法。如果队列已经满了, 那么p...
    好好学习Sun阅读 5,599评论 0 4
  • 引言 JDK中除了上文提到的各种并发容器,还提供了丰富的阻塞队列。阻塞队列统一实现了BlockingQueue接口...
    小刀爱编程阅读 3,671评论 0 0
  • 1、简介 Queue(队列):一种特殊的线性表,它只允许在表的前端(front)进行删除操作,只允许在表的后端(r...
    瑜小贤阅读 3,610评论 1 0
  • java笔记第一天 == 和 equals ==比较的比较的是两个变量的值是否相等,对于引用型变量表示的是两个变量...
    jmychou阅读 5,400评论 0 3
  • 一直都知道自己是个没有智商的人,活到今天才发觉我连情商都少得可怜;从小是个伶牙俐齿的人,谁知现在说话都不...
    猪麦兜阅读 877评论 0 0