一. ArrayBlockingQueue
内部使用数组存储元素,利用reentrantlock实现线程安全,在创建的时候必须指定容量,之后也不可以再扩容了,在构造函数中可以指定是否公平,第一个参数是容量,第二个参数是是否公平。正如ReentrantLock一样,如果ArrayBlockingQueue被设置为非公平的,那么就存在插队的可能;如果设置为公平的,那么等待了最长时间的线程会被优先处理,其他线程不允许插队,不过这样的公平策略同时会带来一定的性能损耗,因为非公平的吞吐量通常会高于公平的情况。
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
二. LinkedBlockingQueue
内部使用链表实现,可以指定容量,如果不指定,则为Integer.MAX_VALUE,就是说这个容量可以无限大,所以这个队列也被称为无界队列。基于链接节点的可选边界阻塞队列。该队列对元素进行 FIFO(先进先出,first in, first out)排序。队列的头部是队列中时间最长的元素。队列的尾部是在队列中时间最短的元素
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
三.SynchronousQueue
这个队列最大的不同就是,他不会存储元素,放元素和取元素的时候都会阻塞队列,比如我放一个数据到队列中,我是不能够立马返回的,我必须等待别人把我放进去的数据消费掉了,才能够返回
public SynchronousQueue(boolean fair) {
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}
四.PriorityBlockingQueue
前面我们所说的ArrayBlockingQueue和LinkedBlockingQueue都是采用先进先出的顺序进行排序,可是如果有的时候我们需要自定义排序怎么办呢?这时就需要使用PriorityBlockingQueue。
PriorityBlockingQueue是一个支持优先级的无界阻塞队列,可以通过自定义类实现compareTo()方法来指定元素排序规则,或者初始化时通过构造器参数Comparator来指定排序规则。同时,插入队列的对象必须是可比较大小的,也就是Comparable的,否则会抛出ClassCastException异常。
//带比较类型的构造方法
public PriorityBlockingQueue(int initialCapacity,
Comparator<? super E> comparator) {
if (initialCapacity < 1)
throw new IllegalArgumentException();
this.lock = new ReentrantLock();
this.notEmpty = lock.newCondition();
this.comparator = comparator;
this.queue = new Object[initialCapacity];
}
五.DelayQueue
DelayQueue这个队列比较特殊,具有“延迟”的功能。我们可以设定让队列中的任务延迟多久之后执行,比如10秒钟之后执行,这在例如“30分钟后未付款自动取消订单”等需要延迟执行的场景中被大量使用。它是无界队列,放入的元素必须实现Delayed接口,而Delayed接口又继承了Comparable接口,所以自然就拥有了比较和排序的能力,代码如下:
import java.util.concurrent.DelayQueue;
public class DelayQueueExample {
public static void main(String[] args) throws InterruptedException {
// 创建一个 DelayQueue
DelayQueue<DelayElement> queue = new DelayQueue<>();
// 将三个延迟元素添加到队列中
queue.put(new DelayElement(1, 1000));
queue.put(new DelayElement(2, 2000));
queue.put(new DelayElement(3, 3000));
// 取出并输出队列中的元素
System.out.println(queue.take());
System.out.println(queue.take());
System.out.println(queue.take());
}
}
class DelayElement implements Delayed {
private int data;
private long delayTime;
private long expire;
public DelayElement(int data, long delayTime) {
this.data = data;
this.delayTime = delayTime;
this.expire = System.currentTimeMillis() + delayTime;
}
// 实现 compareTo 方法
@Override
public int compareTo(Delayed o) {
if (this.expire < ((DelayElement)o).expire) {
return -1;
} else if (this.expire > ((DelayElement)o).expire) {
return 1;
} else {
return 0;
}
}
// 实现 getDelay 方法
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(this.expire - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
// 重写 toString 方法
@Override
public String toString() {
return String.valueOf(this.data);
}
}