线程池中的队列要求的是阻塞队列,作用主要是当线程池处理任务能力不足时,队列存储多余的任务,从而起到削峰和缓冲的目的。
可以选择的队列种类很多,如何选择合适的队列应用到自己的线程池中?就需要了解他们的优缺点,从而择优使用
1、常见阻塞队列
常见的阻塞队列都是以基于BlockingQueue的实现
ArrayBlockingQueue
一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。
LinkedBlockingQueue
一个基于链表结构的有界阻塞队列(不设置大小时,默认为Integer.MAX_VALUE),此队列按FIFO (先进先出) 排序元素。Executors的几个静态线程池工厂方法大部分都是使用这个队列
SynchronousQueue
一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态;同理,当每个读操作的时候,同样需要一个相匹配的写操作。这里的 Synchronous 指的就是读写操作需要同步,一个读操作对应一个写操作。
注:吞吐量通常要高于LinkedBlockingQueue。静态工厂方法Executors.newCachedThreadPool使用了这个队列。
DelayQueue
是一个支持延时获取元素的无界阻塞队列。内部是基于PriorityQueue的实现。
PriorityBlockingQueue
一个具有优先级的无限阻塞队列。只能指定初始的队列大小,后面插入元素的时候,如果空间不够的话会自动扩容
注:它是无界队列,put操作不会阻塞,但take方法在队列为空的时候会阻塞队列元素不可以插入null值,同时插入队列的元素必须是可比较大小的(comparable),否则报 ClassCastException 异常
2、最常使用的两种队列
2.1、ArrayBlockingQueue
是一个基于数组结构的有界阻塞队列,底层结构是一个数组
/** The queued items */
final Object[] items;
创建队列时,在构造器中创建的数组对象(最大的容量在创建时就确定了)
/** items index for next take, poll, peek or remove */
int takeIndex;
外界下次从队列获取数据时,指定从队列的takeIndex下标获取
/** items index for next put, offer, or add */
int putIndex;
外界下次向队列存入数据时,指定从队列的putIndex下标存入
/** Number of elements in the queue */
int count;
队列中实际存储数据的数量(即数组中真正有数据的元素数量)
/** Main lock guarding all access */
final ReentrantLock lock;
外界存储和读取队列数据时,存在并发情况。
这里使用一把锁同时控制读写数据。因此实际上读写是串行的。
正因如此,count的类型是int而非AtomicInteger,不需要考虑线程安全
/** Condition for waiting takes */
private final Condition notEmpty;
lock的Condition控制,含义是非空。
即队列无数据时,notEmpty.await()阻塞;有数据时,notEmpty.signal()。
从而控制线程间调度
/** Condition for waiting puts */
private final Condition notFull;
同样是lock的Condition控制,含义是非满,
即队列数据数量达到最大时,notFull.await()阻塞;
有数据时,notFull.signal()。从而控制线程间调度
外部存储数据时,从头开始向后遍历数组插入数据,并记录偏移量,供下次从偏移量位置再次存储;
而读取数据时也是一样,从头开始向后遍历数组读取并删除数据,记录偏移量供下次从偏移量位置再次读取
另外考虑一件问题:由于数组有界,总会读写到最后一个元素。数组前部分读取后置空,如果不再使用就浪费了。而后部分到达了尾部,队列不能再插入数据了
这就引入ArrayBlockingQueue的一个设计,即到达尾部后,若头部空置,则复用头部资源。
让线性的有界数组,在逻辑上成为环形数组,从而达到资源复用的目的。
2.2、LinkedBlockingQueue
一个基于链表结构的有界阻塞队列(不设置大小时,默认为Integer.MAX_VALUE),底层结构是一个单向链表
/** The capacity bound, or Integer.MAX_VALUE if none */
private final int capacity;
创建队列时,在构造器中指定队列最大的容量
/**Head of linked list.Invariant: head.item == null*/
transient Node<E> head;
链表的头部节点
这里的头部节点其实并没有存储真实数据,下一个节点才开始存储。
这是链表常用的结构,通过在头部增加一个空节点,从而使所有有效节点都是
链表中间节点,增删是可以统一处理。否则就要区分头部节点和中间节点,
导致区分处理了。
/**Tail of linked list.Invariant: last.next == null*/
private transient Node<E> last;
链表的尾部节点
/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
外部多线程存储数据时,存在并发场景。因此使用putLock和notFull加锁控制。
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
外部多线程读取数据时,存在并发场景。因此使用takeLock和notEmpty加锁控制。
/** Current number of elements */
private final AtomicInteger count = new AtomicInteger();
链表中实际存储数据的数量,含义对应ArrayBlockingQueue的count。
但它的类型是AtomicInteger,由于链表读写为不同的锁,存在并发场景,
因此加减数量时要考虑并发安全问题
外部存储数据时在尾部插入数据;而读取数据时则从头部读取并删除节点数据
3、ArrayBlockingQueue和LinkedBlockingQueue的选择
ArrayBlockingQueue | LinkedBlockingQueue | |
---|---|---|
底层结构 | 数组 (逻辑上环形数组) |
单向链表 |
是否有界 | 有界阻塞队列 (大小必须声明) |
有界阻塞队列 (不声明则默认为Integer.MAX_VALUE) |
公平锁 | 可配置是否使用公平锁 (默认非公平) |
仅支持非公平锁 |
锁 | 读写共用一把ReentrantLock锁 两个condition判断满空状态 |
读写分别使用一把ReentrantLock锁 使用各自的condition判断满空状态 |
count | 共用一把锁,因此队列内无并发,类型为int | 读写两把锁,队列内存在并发,因此类型为AtomicInteger |
其他 | 由于是链表,因此插入的数据要多创建一个Node对象存,会对GC有影响 |
其他异同点:
- LinkedBlockingQueue底层由于是链表,因此插入数据时要多创建一个Node对象,因此会对GC有影响
- ArrayBlockingQueue从头开始遍历进行读写;而LinkedBlockingQueue则为链尾加元素,链尾取元素
- LinkedBlockingQueue读写各加一把锁,通常比ArrayBlockingQueue具有更高的吞吐量,但是在大多数并发应用程序中,可预测的性能较差。
4、自定义队列大小
无论是ArrayBlockingQueue和LinkedBlockingQueue,他们的队列大小都是不可变的。
ArrayBlockingQueue底层是数组,大小固定。
而LinkedBlockingQueue的capacity则被final修饰,不可修改。
而我们实际项目中,往往需要根据业务实际需要调整队列大小。那么如果实现队列的大小可变?
如果我们想基于ArrayBlockingQueue进行改造,但修改大小必然要涉及到重新创建数组,以及新旧数组的数据迁移问题,有些复杂。
如果考虑基于LinkedBlockingQueue进行改造,我们只要将修饰capacity的final去掉即可实现动态调整。但有一个问题,LinkedBlockingQueue具体实现中很多是基于capacity不变进行的设计,因此我们需要将涉及的功能进行调整
调整思路:
1、capacity可修改大小:去掉修饰词final,增加set方法便可动态调整
2、梳理受影响的范围:将代码中应用capacity的功能进行梳理,通过capacity的改动使其兼容正常功能
具体进行以下调整,其他内容不变:
4.1、类名修改
将LinkedBlockingQueue的代码实现拷贝并修改类名为ResizeLinkedBlockingQueue
4.2、将capacity的修饰词final去掉,增加volatile修饰词。改动后使其立即生效。并设置capacity的set方法,并在set时限制大小
4.3、梳理capacity所涉及的各个应用点,进行调整
主要是存储数据的几个方法条件判断时进行改动
------The End------
如果这个办法对您有用,或者您希望持续关注,可以在wx公众号中搜索【码路无涯】,期待你的到来