引入
caffenie作为目前本地缓存的首选,其内部设计思想有很多值得我们学习的地方。缓存中最主要的数据竞争源于读取数据的同时,也会伴随着对数据状态的写入操作;写入数据的同时,也会伴随着数据状态的读取操作。譬如,读取时要同时更新数据的最近访问时间和访问计数器的状态(当然caffeine为了追求高效,不会记录时间和次数,譬如通过调整链表顺序来表达时间先后、通过 Sketch 结构来表达热度高低),以实现缓存的淘汰策略;又或者读取时要同时判断数据的超期时间等信息,以实现失效重加载等其他扩展功能。对以上伴随读写操作而来的状态维护,有两种可选择的处理思路,一种是以 Guava Cache 为代表的同步处理机制,即在访问数据时一并完成缓存淘汰、统计、失效等状态变更操作,通过分段加锁等优化手段来尽量减少竞争。另一种是以 Caffeine 为代表的异步日志提交机制,这种机制参考了经典的数据库设计理论,将对数据的读、写过程看作是日志(即对数据的操作指令)的提交过程。尽管日志也涉及到写入操作,有并发的数据变更就必然面临锁竞争,但异步提交的日志已经将原本在 Map 内的锁转移到日志的追加写操作上,日志里腾挪优化的余地就比在 Map 中要大得多。
我们上文提到,对缓存的维护可以分为两个场景,一个是读取缓存时,另一个是向缓存写入数据时,这两个场景,caffeine都是提交日志异步处理的方式维护缓存的淘汰、统计、失效等。读取缓存和写入缓存两个场景还是有差异,本文是针对第一个场景读取缓存时异步去维护缓存的设计剖析。
问题
在异步维护(淘汰、统计、失效等)缓存过程中,需要先向一个中间队列去存储key的信息,然后再由消费者去读取队列中的元素,继而去维护(淘汰、统计、失效等)缓存的信息。那么,对这个中间队列的性能要求就会近乎苛刻,因为缓存意味着高并发,所以队列的并发写入速度要足够快。需要解决的问题大致体现在以下几个方面
1、因为队列要求写入速度,队列中的元素不断创建,可能会导致gc压力过大
2、因为队列存在多线程并发写入的情况,又要足够快,所以对队列的并发竞争会很大,我们需要想办法解决并发竞争的问题。
解决思路:
正常队列的数据结构就是数组,或者链表。java的jdk中,我们常用的队列有ArrayBlockingQueue、LinkedBlockingQueue、ConcurrentLinkedQueue。
1、针对第二个方面,我们需要解决并发竞争的问题,常规的解决思路有两条,一条是加锁保证临界资源的安全,还有一个就是原生的cas操作可以保证。加锁并不是最优的解决方案,在真正的高并发情况下,锁会带来性能的开销,所以我们其实是希望能不用锁的情况下就不用锁的,如果能用原生的cas操作是最好的。但是同样,用原生的cas操作,我们也需要解决cas操作在高并发下的恶性空自旋的问题,这个我们根据具体情况看是否可以采用分离热点的方式(具体可以参考高并发下的设计思想--分离热点)
这么看的话,ArrayBlockingQueue、LinkedBlockingQueue都是内部都是采用了lock锁的方式,并不合适。
2、再看ConcurrentLinkedQueue是无锁的,内部也是用cas操作保证的多线程安全。但是针对第一点,ConcurrentLinkedQueue 内部是链表实现,高并发情况下,链表的节点会被频繁的创建,这会给gc带来压力。那有没有一种数据结构,是以数组为基础,但是内部的内存空间又可以重复利用的呢?答案是环形队列,内部用数组实现就可以了;并且环形队列的好处不止于此,下文详细描述。
环形队列
环形队列是在实际编程极为有用的数据结构,它采用数组的线性空间,数据组织简单,能很快知道队列是否满或空,能以很快速度的来存取数据。
从顺时针看,环形队列 有队头 head 和队尾 tail。
- 生产的流程是:
生产者顺时针向队尾 tail 插入元素,这会导致 head 位置不变,tail 位置在后移; - 消费的流程是:
消费者则从队头 head 开始消费,这会导致 head 向后移动,而tail 位置不变,如果队列满了就不能写入。 - 环形队列的特点:
队头 head 和队尾 tail 的位置是不定的,位置一直在循环流动,空间就被重复利用起来了。因为有简单高效的原因,甚至在硬件都实现了环形队列.。环形队列广泛用于网络数据收发,和不同程序间数据交换(比如内核与应用程序大量交换数据,从硬件接收大量数据)均使用了环形队列。
环形核心的结构和流程说明
- 约定head指向队列的第一个元素,
也就是说data[head]就是队头数据,head初始值为0。 - 约定tail指向队列的最后一个元素的后一个位置,
也就是说data[tail-1]就是队尾数据,tail初始值为0。 - 队列满的条件是:
( tail+1 )% maxSize == head - 队列空的条件是:
tail == head - 队列中的元素个数为:
( tail + maxsize - head) % maxSize - 有效数据只有maxSize-1个
因为tail指向队尾的后面一个位置,这个位置就不能存数据,因此有效数据只有maxSize-1个
至此,我们清楚我们的数据结构主体雏形--环形的数组队列,同时我们需要将队列采用分离热点的思想,将队列分散到一个数组中,数组可以存多个环形队列,将每个线程id对数组长度取模,然后映射到数组中的下标位置中,这样每个线程都有自己对应的环形数组,就可以极大地降低竞争。同样在消费队列元素时,消费数组中的多个环形队列中节点元素。
caffeine的设计
通过以上分析,再结合caffeine中对读缓异步队列的设计,我们可以看到,后者的设计思想和我们分析的基本一致,下面是其数据结构图:
在源码中的体现上,table[]数组在父类StripedBuffer里,父类提供入队和出队的钩子方法以及table数组初始化和扩容的具体实现。RingBuffer继承自StripedBuffer,其中包含了环形队列中节点元素值数组 AtomicReferenceArray,以及头结点和尾节点;提供了入队的具体实现,以及消费时批量出队的具体实现方法drainTo。
StripedBuffer数据结构:
//table数组的最大长度
static final int MAXIMUM_TABLE_SIZE
//buffer数组,数组里的元素是ringBuffer
transient volatile Buffer<E> @Nullable[] table
//扩容或者初始化数组的锁标识
transient volatile int tableBusy
需要注意的是,什么时候会进入扩容、初始化数组的操作中。在追加到RingBuffer如果失败了(存在竞争),是会进行扩容或者初始化逻辑的。并且在扩容初始化的逻辑中,会在扩容后再次尝试追加,如果连续三次都失败的话,那么这次写入就是失败的。所以ringbuffer的写入会存在失败的情况,不能保证强成功。不过在读缓存后异步入队列的场景中,这种偶发的失败是可以接受的。
/**
* 入队逻辑
*/
public int offer(E e) {
int mask;
int result = 0;
Buffer<E> buffer;
//是否不存在竞争
boolean uncontended = true;
Buffer<E>[] buffers = table;
//是否已经初始化
if ((buffers == null)
|| (mask = buffers.length - 1) < 0
////用thread的随机值作为hash值,得到对应位置的RingBuffer
|| (buffer = buffers[getProbe() & mask]) == null
//检查追加到RingBuffer是否成功,如果失败,会进行扩容初始化逻辑
|| !(uncontended = ((result = buffer.offer(e)) != Buffer.FAILED))) {
expandOrRetry(e, uncontended);
}
return result;
}
/**
* 批量出队逻辑
*/
public void drainTo(Consumer<E> consumer) {
Buffer<E>[] buffers = table;
if (buffers == null) {
return;
}
for (Buffer<E> buffer : buffers) {
if (buffer != null) {
//调用ringBuffer的出队方法
buffer.drainTo(consumer);
}
}
}
/**
* table数组扩容、初始化逻辑
*/
final void expandOrRetry(E e, boolean wasUncontended) {
int h;
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current(); // force initialization
h = getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
//默认重试3次,允许丢失
for (int attempt = 0; attempt < ATTEMPTS; attempt++) {
Buffer<E>[] buffers;
Buffer<E> buffer;
int n;
//buffer数组不为空的情况
if (((buffers = table) != null) && ((n = buffers.length) > 0)) {
//对应数组下标元素为空
if ((buffer = buffers[(n - 1) & h]) == null) {
if ((tableBusy == 0) && casTableBusy()) { // Try to attach new Buffer
boolean created = false;
try { // Recheck under lock
Buffer<E>[] rs;
int mask, j;
if (((rs = table) != null) && ((mask = rs.length) > 0)
&& (rs[j = (mask - 1) & h] == null)) {
rs[j] = create(e);
created = true;
}
} finally {
tableBusy = 0;
}
if (created) {
break;
}
continue; // Slot is now non-empty
}
collide = false;
} else if (!wasUncontended) { // CAS already known to fail
wasUncontended = true; // Continue after rehash
}
//环形队列里追加元素是否成功,这里的offer是ringbuffer环形队列的入队
else if (buffer.offer(e) != Buffer.FAILED) {
break;
}
//数组长度是否大于最大长度,最大长度= 4*2的N次方(N=cpu核心数)
else if (n >= MAXIMUM_TABLE_SIZE || table != buffers) {
collide = false; // At max size or stale
} else if (!collide) {
collide = true;
}
//真正的扩容逻辑(走到这步说明环形队列中添加元素失败,并发太高,此时正有其他线程也在这个buffer环形队列中添加元素,所以考虑扩容table数组的长度)
else if (tableBusy == 0 && casTableBusy()) {
try {
if (table == buffers) { // Expand table unless stale
table = Arrays.copyOf(buffers, n << 1);
}
} finally {
tableBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h = advanceProbe(h);
}
//buffer数组为空则初始化数组
else if ((tableBusy == 0) && (table == buffers) && casTableBusy()) {
boolean init = false;
try { // Initialize table
if (table == buffers) {
@SuppressWarnings({"unchecked", "rawtypes"})
Buffer<E>[] rs = new Buffer[1];
rs[0] = create(e);
table = rs;
init = true;
}
} finally {
tableBusy = 0;
}
if (init) {
break;
}
}
}
}
RingBuffer的数据结构,这里需要注意的是,为了方便理解,罗列了部分主要属性,实际readCounter和writeCounter在计算时,是相对于各自原始位置的偏移量。
final class BoundedBuffer<E> extends StripedBuffer<E>{
static final class RingBuffer<E> extends BBHeader.ReadAndWriteCounterRef{
//引用类型原子数组(存放环形队列中的节点值)
final AtomicReferenceArray<E> buffer;
//读指针:相当于head
volatile long readCounter;
//写指针:相当于tail
volatile long writeCounter;
public RingBuffer(E e) {
//BUFFER_SIZE = 16
buffer = new AtomicReferenceArray<>(BUFFER_SIZE);
buffer.lazySet(0, e);
}
/**
* 入队
*/
public int offer(E e) {
long head = readCounter;
long tail = relaxedWriteCounter();
long size = (tail - head);
//判断环形队列是否已满,这里之所以可以用tail-head>=buffer_size 来判断是否队列已满,
//是因为这里的head和tail是相对于各自原始位置的偏移量
if (size >= BUFFER_SIZE) {
return Buffer.FULL;
}
//将tail指针向后移动一格
if (casWriteCounter(tail, tail + 1)) {
int index = (int) (tail & MASK);
//设置环形数组中响应位置元素
buffer.lazySet(index, e);
return Buffer.SUCCESS;
}
return Buffer.FAILED;
}
/**
* 批量出队
*/
public void drainTo(Consumer<E> consumer) {
long head = readCounter;
long tail = relaxedWriteCounter();
long size = (tail - head);
//环形队列是否已空
if (size == 0) {
return;
}
do {
int index = (int) (head & MASK);
E e = buffer.get(index);
if (e == null) {
// not published yet
break;
}
//元素出队,相应位置置为空
buffer.lazySet(index, null);
consumer.accept(e);
//head指针向后移动一格
head++;
} while (head != tail);
//批量设置head指针的偏移量
lazySetReadCounter(head);
}
}
}
总结:
caffeine在缓存读日志(读操作) 高并发写入队列场景中,使用了环形队列可以有效避免高并发场景下节点频繁创建带来的gc压力,同时可以边写边读(消费),提高了队列的吞吐量。并且采用原生cas操作和分离热点的方法,将线程对队列并发写的竞争分散到数组中的每一个队列中,有效降低了并发的竞争。