ringbuffer(环形队列)在caffeine中的设计

引入

caffenie作为目前本地缓存的首选,其内部设计思想有很多值得我们学习的地方。缓存中最主要的数据竞争源于读取数据的同时,也会伴随着对数据状态的写入操作;写入数据的同时,也会伴随着数据状态的读取操作。譬如,读取时要同时更新数据的最近访问时间和访问计数器的状态(当然caffeine为了追求高效,不会记录时间和次数,譬如通过调整链表顺序来表达时间先后、通过 Sketch 结构来表达热度高低),以实现缓存的淘汰策略;又或者读取时要同时判断数据的超期时间等信息,以实现失效重加载等其他扩展功能。对以上伴随读写操作而来的状态维护,有两种可选择的处理思路,一种是以 Guava Cache 为代表的同步处理机制,即在访问数据时一并完成缓存淘汰、统计、失效等状态变更操作,通过分段加锁等优化手段来尽量减少竞争。另一种是以 Caffeine 为代表的异步日志提交机制,这种机制参考了经典的数据库设计理论,将对数据的读、写过程看作是日志(即对数据的操作指令)的提交过程。尽管日志也涉及到写入操作,有并发的数据变更就必然面临锁竞争,但异步提交的日志已经将原本在 Map 内的锁转移到日志的追加写操作上,日志里腾挪优化的余地就比在 Map 中要大得多。
我们上文提到,对缓存的维护可以分为两个场景,一个是读取缓存时,另一个是向缓存写入数据时,这两个场景,caffeine都是提交日志异步处理的方式维护缓存的淘汰、统计、失效等。读取缓存和写入缓存两个场景还是有差异,本文是针对第一个场景读取缓存时异步去维护缓存的设计剖析。

问题

在异步维护(淘汰、统计、失效等)缓存过程中,需要先向一个中间队列去存储key的信息,然后再由消费者去读取队列中的元素,继而去维护(淘汰、统计、失效等)缓存的信息。那么,对这个中间队列的性能要求就会近乎苛刻,因为缓存意味着高并发,所以队列的并发写入速度要足够快。需要解决的问题大致体现在以下几个方面
1、因为队列要求写入速度,队列中的元素不断创建,可能会导致gc压力过大
2、因为队列存在多线程并发写入的情况,又要足够快,所以对队列的并发竞争会很大,我们需要想办法解决并发竞争的问题。

解决思路:

正常队列的数据结构就是数组,或者链表。java的jdk中,我们常用的队列有ArrayBlockingQueue、LinkedBlockingQueue、ConcurrentLinkedQueue。
1、针对第二个方面,我们需要解决并发竞争的问题,常规的解决思路有两条,一条是加锁保证临界资源的安全,还有一个就是原生的cas操作可以保证。加锁并不是最优的解决方案,在真正的高并发情况下,锁会带来性能的开销,所以我们其实是希望能不用锁的情况下就不用锁的,如果能用原生的cas操作是最好的。但是同样,用原生的cas操作,我们也需要解决cas操作在高并发下的恶性空自旋的问题,这个我们根据具体情况看是否可以采用分离热点的方式(具体可以参考高并发下的设计思想--分离热点
这么看的话,ArrayBlockingQueue、LinkedBlockingQueue都是内部都是采用了lock锁的方式,并不合适。
2、再看ConcurrentLinkedQueue是无锁的,内部也是用cas操作保证的多线程安全。但是针对第一点,ConcurrentLinkedQueue 内部是链表实现,高并发情况下,链表的节点会被频繁的创建,这会给gc带来压力。那有没有一种数据结构,是以数组为基础,但是内部的内存空间又可以重复利用的呢?答案是环形队列,内部用数组实现就可以了;并且环形队列的好处不止于此,下文详细描述。

环形队列

环形队列是在实际编程极为有用的数据结构,它采用数组的线性空间,数据组织简单,能很快知道队列是否满或空,能以很快速度的来存取数据。
从顺时针看,环形队列 有队头 head 和队尾 tail。


环形队列
  • 生产的流程是:
    生产者顺时针向队尾 tail 插入元素,这会导致 head 位置不变,tail 位置在后移;
  • 消费的流程是:
    消费者则从队头 head 开始消费,这会导致 head 向后移动,而tail 位置不变,如果队列满了就不能写入。
  • 环形队列的特点:
    队头 head 和队尾 tail 的位置是不定的,位置一直在循环流动,空间就被重复利用起来了。因为有简单高效的原因,甚至在硬件都实现了环形队列.。环形队列广泛用于网络数据收发,和不同程序间数据交换(比如内核与应用程序大量交换数据,从硬件接收大量数据)均使用了环形队列。

环形核心的结构和流程说明

  1. 约定head指向队列的第一个元素,
    也就是说data[head]就是队头数据,head初始值为0。
  2. 约定tail指向队列的最后一个元素的后一个位置,
    也就是说data[tail-1]就是队尾数据,tail初始值为0。
  3. 队列满的条件是:
    ( tail+1 )% maxSize == head
  4. 队列空的条件是:
    tail == head
  5. 队列中的元素个数为:
    ( tail + maxsize - head) % maxSize
  6. 有效数据只有maxSize-1个
    因为tail指向队尾的后面一个位置,这个位置就不能存数据,因此有效数据只有maxSize-1个

至此,我们清楚我们的数据结构主体雏形--环形的数组队列,同时我们需要将队列采用分离热点的思想,将队列分散到一个数组中,数组可以存多个环形队列,将每个线程id对数组长度取模,然后映射到数组中的下标位置中,这样每个线程都有自己对应的环形数组,就可以极大地降低竞争。同样在消费队列元素时,消费数组中的多个环形队列中节点元素。

caffeine的设计

通过以上分析,再结合caffeine中对读缓异步队列的设计,我们可以看到,后者的设计思想和我们分析的基本一致,下面是其数据结构图:

环形队列

在源码中的体现上,table[]数组在父类StripedBuffer里,父类提供入队和出队的钩子方法以及table数组初始化和扩容的具体实现。RingBuffer继承自StripedBuffer,其中包含了环形队列中节点元素值数组 AtomicReferenceArray,以及头结点和尾节点;提供了入队的具体实现,以及消费时批量出队的具体实现方法drainTo。

StripedBuffer数据结构:

//table数组的最大长度
static final int MAXIMUM_TABLE_SIZE
//buffer数组,数组里的元素是ringBuffer
transient volatile Buffer<E> @Nullable[] table
//扩容或者初始化数组的锁标识
transient volatile int tableBusy

需要注意的是,什么时候会进入扩容、初始化数组的操作中。在追加到RingBuffer如果失败了(存在竞争),是会进行扩容或者初始化逻辑的。并且在扩容初始化的逻辑中,会在扩容后再次尝试追加,如果连续三次都失败的话,那么这次写入就是失败的。所以ringbuffer的写入会存在失败的情况,不能保证强成功。不过在读缓存后异步入队列的场景中,这种偶发的失败是可以接受的。

   /**
    * 入队逻辑
    */
  public int offer(E e) {
    int mask;
    int result = 0;
    Buffer<E> buffer;
    //是否不存在竞争
    boolean uncontended = true;
    Buffer<E>[] buffers = table;
    //是否已经初始化
    if ((buffers == null)
        || (mask = buffers.length - 1) < 0
        ////用thread的随机值作为hash值,得到对应位置的RingBuffer
        || (buffer = buffers[getProbe() & mask]) == null
        //检查追加到RingBuffer是否成功,如果失败,会进行扩容初始化逻辑
        || !(uncontended = ((result = buffer.offer(e)) != Buffer.FAILED))) {
      expandOrRetry(e, uncontended);
    }
    return result;
  }

  /**
    * 批量出队逻辑
    */
  public void drainTo(Consumer<E> consumer) {
    Buffer<E>[] buffers = table;
    if (buffers == null) {
      return;
    }
    for (Buffer<E> buffer : buffers) {
      if (buffer != null) {
        //调用ringBuffer的出队方法
        buffer.drainTo(consumer);
      }
    }
  }

  /**
    * table数组扩容、初始化逻辑
    */
  final void expandOrRetry(E e, boolean wasUncontended) {
    int h;
    if ((h = getProbe()) == 0) {
      ThreadLocalRandom.current(); // force initialization
      h = getProbe();
      wasUncontended = true;
    }
    boolean collide = false; // True if last slot nonempty
    //默认重试3次,允许丢失
    for (int attempt = 0; attempt < ATTEMPTS; attempt++) {
      Buffer<E>[] buffers;
      Buffer<E> buffer;
      int n;
      //buffer数组不为空的情况
      if (((buffers = table) != null) && ((n = buffers.length) > 0)) {
        //对应数组下标元素为空
        if ((buffer = buffers[(n - 1) & h]) == null) {
          if ((tableBusy == 0) && casTableBusy()) { // Try to attach new Buffer
            boolean created = false;
            try { // Recheck under lock
              Buffer<E>[] rs;
              int mask, j;
              if (((rs = table) != null) && ((mask = rs.length) > 0)
                  && (rs[j = (mask - 1) & h] == null)) {
                rs[j] = create(e);
                created = true;
              }
            } finally {
              tableBusy = 0;
            }
            if (created) {
              break;
            }
            continue; // Slot is now non-empty
          }
          collide = false;
        } else if (!wasUncontended) { // CAS already known to fail
          wasUncontended = true;      // Continue after rehash
        } 
        //环形队列里追加元素是否成功,这里的offer是ringbuffer环形队列的入队
        else if (buffer.offer(e) != Buffer.FAILED) {
          break;
        } 
        //数组长度是否大于最大长度,最大长度= 4*2的N次方(N=cpu核心数)
        else if (n >= MAXIMUM_TABLE_SIZE || table != buffers) {
          collide = false; // At max size or stale
        } else if (!collide) {
          collide = true;
        } 
        //真正的扩容逻辑(走到这步说明环形队列中添加元素失败,并发太高,此时正有其他线程也在这个buffer环形队列中添加元素,所以考虑扩容table数组的长度)
        else if (tableBusy == 0 && casTableBusy()) {
          try {
            if (table == buffers) { // Expand table unless stale
              table = Arrays.copyOf(buffers, n << 1);
            }
          } finally {
            tableBusy = 0;
          }
          collide = false;
          continue; // Retry with expanded table
        }
        h = advanceProbe(h);
      } 
      //buffer数组为空则初始化数组
      else if ((tableBusy == 0) && (table == buffers) && casTableBusy()) {
        boolean init = false;
        try { // Initialize table
          if (table == buffers) {
            @SuppressWarnings({"unchecked", "rawtypes"})
            Buffer<E>[] rs = new Buffer[1];
            rs[0] = create(e);
            table = rs;
            init = true;
          }
        } finally {
          tableBusy = 0;
        }
        if (init) {
          break;
        }
      }
    }
  }

RingBuffer的数据结构,这里需要注意的是,为了方便理解,罗列了部分主要属性,实际readCounter和writeCounter在计算时,是相对于各自原始位置的偏移量。

final class BoundedBuffer<E> extends StripedBuffer<E>{

    static final class RingBuffer<E> extends BBHeader.ReadAndWriteCounterRef{
        //引用类型原子数组(存放环形队列中的节点值)
        final AtomicReferenceArray<E> buffer;
        //读指针:相当于head
        volatile long readCounter;
        //写指针:相当于tail
        volatile long writeCounter;

        public RingBuffer(E e) {
             //BUFFER_SIZE = 16
             buffer = new AtomicReferenceArray<>(BUFFER_SIZE);
             buffer.lazySet(0, e);
        }

        /**
         * 入队
        */
        public int offer(E e) {
          long head = readCounter;
          long tail = relaxedWriteCounter();
          long size = (tail - head);
          //判断环形队列是否已满,这里之所以可以用tail-head>=buffer_size 来判断是否队列已满,
          //是因为这里的head和tail是相对于各自原始位置的偏移量
          if (size >= BUFFER_SIZE) {
            return Buffer.FULL;
          }
          //将tail指针向后移动一格
          if (casWriteCounter(tail, tail + 1)) {
            int index = (int) (tail & MASK);
            //设置环形数组中响应位置元素
            buffer.lazySet(index, e);
            return Buffer.SUCCESS;
          }
          return Buffer.FAILED;
        }
        
        /**
          * 批量出队
         */
        public void drainTo(Consumer<E> consumer) {
           long head = readCounter;
           long tail = relaxedWriteCounter();
           long size = (tail - head);
           //环形队列是否已空
           if (size == 0) {
             return;
           }
           do {
             int index = (int) (head & MASK);
             E e = buffer.get(index);
             if (e == null) {
               // not published yet
               break;
             }
             //元素出队,相应位置置为空
             buffer.lazySet(index, null);
             consumer.accept(e);
             //head指针向后移动一格
             head++;
           } while (head != tail);
           //批量设置head指针的偏移量
           lazySetReadCounter(head);
         }
    }


}

总结:

caffeine在缓存读日志(读操作) 高并发写入队列场景中,使用了环形队列可以有效避免高并发场景下节点频繁创建带来的gc压力,同时可以边写边读(消费),提高了队列的吞吐量。并且采用原生cas操作和分离热点的方法,将线程对队列并发写的竞争分散到数组中的每一个队列中,有效降低了并发的竞争。

引用
凤凰架构-服务器端缓存
caffeine源码

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,539评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,911评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,337评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,723评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,795评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,762评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,742评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,508评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,954评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,247评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,404评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,104评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,736评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,352评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,557评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,371评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,292评论 2 352

推荐阅读更多精彩内容