概述
blockQueue 作为线程容器、阻塞队列,多用于生产者、消费者的关系模式中,保障并发编程线程同步,线程池中被用于当作存储任务的队列,还可以保证线程执行的有序性。
常用方法
生产
- add(obj):往队列里面增加一个对象,如果队列没有空间抛出异常,反之返回true。
- offer(obj): 往队列增加一个对象,返回true/false
- put(obj): 往队列增加一个对象,如果没有空间,则会阻塞改线程,直到有空间.
消费
- poll(time):取出排在首位的对象,如果在一定时间内没有返回,则会返回null
- take():取出排在首位的对象,如果队列中没有数据,则会阻塞该线程直到有数据。
查询
- contains(obj):查询是否存在某个元素,返回true/false
- peek():返回队列头部的元素,无则返回null
特点
- 容量有限,可以限定队列的长度,如果没有主动显示队列长度的情况下,默认长度为Integer.MAX_VALUE
- 内存一致性,遵循happend-before原则,即写操作总是先于后面的读操作。参考资料 happend-before
- 因为其继承Collection接口,所以可以使用集合的接口,但某些接口并不保证立即执行,因为其内部维护着内部锁(ReentrantLock),所以只有在获取锁的情况下才会执行对应的代码,以remove()源码为例:
public boolean remove(Object o) {
if (o == null) return false;
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count > 0) {
final int putIndex = this.putIndex;
int i = takeIndex;
do {
if (o.equals(items[i])) {
removeAt(i);
return true;
}
if (++i == items.length)
i = 0;
} while (i != putIndex);
}
return false;
} finally {
lock.unlock();
}
}
每次操作都会去获取锁,如果锁被其他操作暂用,没有获取到锁,则只能去排队,所以上面代码并不会立即执行。
常用分类
前言
创建队列时,可以添加fair参数,用于声明内部锁是否是公平锁,公平锁用于决定队列里面的任务是否会按照顺讯执行。
公平锁:
显式声明为公平锁的任务执行完全按照队列的顺序执行,新的任务进来会存放在队尾。
非公平锁:
队列里面的任务可以按照顺序执行,但是新的任务可能会与队列争抢CPU资源,不保证队列外的顺序。
- ArrayBlockingQueue,创建固定大小的队列,内部维护一个数组,遵循FIFO原则
- LinkedBlockingQueue,可以自定义队列长度,无指定的情况下默认为Integer.MAX_VALUE,内部维护着一个链表,遵循着FIFO原则
- PriorityBlockingQueue,类似ArrayBlockingQueue,内部维护一个数组,但并不按照FIFO原则,其内部有个compare属性决定队列任务的执行顺序。
- SynchronousQueue,特殊的队列,内部无存储空间维护队列,只有当生产者和消费者同时存在时,才会执行,类似与管道。
例子
- 生产者与消费者案例,一个生产者和多个消费者。
public class BlockQueueDemo {
/**
* 生产者
*/
static class Productor implements Runnable{
private BlockingQueue<Integer> blockingQueue = new LinkedBlockingDeque();
Productor(BlockingQueue blockingQueue){
this.blockingQueue = blockingQueue;
}
@Override
public void run() {
for(int i=0;i<100;i++){
try {
Thread.sleep(200);
blockingQueue.put(i);
System.out.println("生产者产品了产品"+i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
/**
* 消费者
*/
static class Consumer implements Runnable{
public BlockingQueue<Integer> blockingQueue = new LinkedBlockingDeque();
Consumer(BlockingQueue blockingQueue){
this.blockingQueue = blockingQueue;
}
@Override
public void run() {
while(true){
try {
String name = Thread.currentThread().getName();
Integer queueData = blockingQueue.take();
System.out.println("消费者"+name+"消费了产品"+queueData);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
/**
* 一个生产者对应多个消费者,采用BlockQueue作为缓冲区
* @param args
*/
public static void main(String[] args) {
BlockingQueue<Integer> blockingQueue = new LinkedBlockingDeque();
Productor productor = new Productor(blockingQueue);
Consumer consumer = new Consumer(blockingQueue);
Consumer consumer2 = new Consumer(blockingQueue);
new Thread(productor).start();
new Thread(consumer).start();
new Thread(consumer2).start();
}
}
执行结果:
消费者Thread-2消费了产品21
生产者产品了产品22
消费者Thread-1消费了产品22
生产者产品了产品23
消费者Thread-2消费了产品23
生产者产品了产品24
消费者Thread-1消费了产品24
生产者产品了产品25
消费者Thread-2消费了产品25
生产者产品了产品26
消费者Thread-1消费了产品26
生产者产品了产品27
消费者Thread-2消费了产品27