在 Java 多线程应用中,队列的使用率很高,多数生产消费模型的首选数据结构就是队列(先进先出)。
Java提供的线程安全的 Queue 可以分为
-
阻塞队列,典型例子是
LinkedBlockingQueue
适用阻塞队列的好处:多线程操作共同的队列时不需要额外的同步,另外就是队列会自动平衡负载,即那边(生产与消费两边)处理快了就会被阻塞掉,从而减少两边的处理速度差距。 -
非阻塞队列,典型例子是
ConcurrentLinkedQueue
当许多线程共享访问一个公共集合时,ConcurrentLinkedQueue
是一个恰当的选择。
LinkedBlockingQueue
多用于任务队列
ConcurrentLinkedQueue
多用于消息队列
下面分别介绍下JDK中阻塞队列和非阻塞队列的各种实现。
Concurrent Queue的两种实现(非阻塞队列)
1. ConcurrentLinkedQueue
a) 基于链表节点, 无界队列。注意size需要遍历整个链表,且如果有其他修改的操作会导致size不准确;尽量使用isEmpty。
b) 通过无锁CAS的方式操作元素,实现了高并发状态下的高性能。
2. ConcurrentLinkedDeque
线程安全双端队列,实现方式基本同ConcurrentLinkedQueue。
BlockingQueue的各种实现(阻塞队列)
BlockingQueue扩展Queue的方法,使其在在队列已满的情况下添加元素时有了4种模式:
其一,抛异常;boolean add(E e);
其二,立即返回false;boolean offer(E e);
其三,等待;void put(E e) throws InterruptedException;
其四,等待一段时间。 boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
JDK中对BlockingQueue提供了近十种实现,学习的时候可以从两个主要方面入手,1) 有界还是无界 2)安全性是如何保证的。
1. ArrayBlockingQueue
a) 内部维护了一个定长数组,以便缓存队列中的数据对象。有界队列。
b) 从其关键属性可以看出来,安全访问控制通过ReentrantLock和Condition配合实现,固每次入列和出列时需要获得全局的锁,因此是不能完全并行的。也有人称其为内部没有实现读写分离。
/** The queued items */
final Object[] items;
/** items index for next take, poll, peek or remove */
int takeIndex;
/** items index for next put, offer, or add */
int putIndex;
/** Number of elements in the queue */
int count;
/*
* Concurrency control uses the classic two-condition algorithm
* found in any textbook.
*/
/** Main lock guarding all access */
final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;
2. LinkedBlockingQueue
基于链表的阻塞队列。
a) 其内部也维持着一个数据缓冲队列〈该队列由一个链表构成);可以指定容量,也可以不指定,不指定的话,默认最大Integer.MAX_VALUE
。也可以算是个无界队列吧。
b) 其内部实现采用分离锁(读写分离两个锁),从而实现生产者和消费者操作的完全并行运行,因此能够高效的处理并发数据。
/**
* Linked list node class
*/
static class Node<E> {
E item;
/**
* One of:
* - the real successor Node
* - this Node, meaning the successor is head.next
* - null, meaning there is no successor (this is the last node)
*/
Node<E> next;
Node(E x) { item = x; }
}
/** The capacity bound, or Integer.MAX_VALUE if none */
private final int capacity;
/** Current number of elements */
private final AtomicInteger count = new AtomicInteger();
/**
* Head of linked list.
* Invariant: head.item == null
*/
transient Node<E> head;
/**
* Tail of linked list.
* Invariant: last.next == null
*/
private transient Node<E> last;
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
3. SynchronousQueue
一种没有缓冲的队列,生产者产生的数据直接会被消费者获取并消费。内部提供了TransferQueue
和TransferStack
,其实是对应FIFO, FILO,模拟公平锁和非公平锁。注意,在没有需要的生产者或者消费者的时候,SynchronousQueue
会通过park阻塞线程。
4. PriorityBlockingQueue
基于优先级的阻塞队列(优先级的判断通过构造函数传入的Comparator对象来决定,传入队列的对象必须实现Comparable接口),在实现PriorityBlockingQueue时,内部控制线程同步的锁采用的是公平锁,他也是一个无界的队列。
a) 底层是HEAP实现,数组可扩展,因此是无界队列,因此put永远不会阻塞。
b) 锁使用ReentrantLock,以及一个nonEmpty Condition,只有在队列为空,take的时候会阻塞。
/**
* Priority queue represented as a balanced binary heap: the two
* children of queue[n] are queue[2*n+1] and queue[2*(n+1)]. The
* priority queue is ordered by comparator, or by the elements'
* natural ordering, if comparator is null: For each node n in the
* heap and each descendant d of n, n <= d. The element with the
* lowest value is in queue[0], assuming the queue is nonempty.
*/
private transient Object[] queue;
/**
* The number of elements in the priority queue.
*/
private transient int size;
/**
* The comparator, or null if priority queue uses elements'
* natural ordering.
*/
private transient Comparator<? super E> comparator;
/**
* Lock used for all public operations
*/
private final ReentrantLock lock;
/**
* Condition for blocking when empty
*/
private final Condition notEmpty;
5. 双端队列BlockingDeque及实现LinkedBlockingDeque
/** Main lock guarding all access */
final ReentrantLock lock = new ReentrantLock();
/** Condition for waiting takes */
private final Condition notEmpty = lock.newCondition();
/** Condition for waiting puts */
private final Condition notFull = lock.newCondition();
可以看出来由于只有一个锁,其实也是不能做到真正意义上的两头并行操作的。
5. DelayQueue
带有延迟时间的Queue
,其中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue
中的元素必须实现Delayed
接口,DelayQueue
是一个没有大小限制的队列,应用场景很多,比如对缓存超时的数据进行移除、任务超时处理、空闲连接的关闭等等。
对这个队列的实现可以结合ScheduledThreadPoolExecutor
相关来看。
终于基本把线程安全Queue相关的实现类都过了一遍了,项目中要结合应用场景选择最合适的使用。这需要建立在对各种实现都很熟悉的基础上。
引用
LinkedBlockingQueue 和 ConcurrentLinkedQueue的用法及区别
java挑战高并发(14): LinkedBlockingQueue和ConcurrentLinkedQueue的区别及用法
java线程安全之并发Queue(十三)