Guavacache源码分析

guavacache是google出的一个开源本地缓存库,适用于高并发场景。本文基于28.1-jre版本

基本特性

  1. 数据自动加载缓存中
  2. 缓存达到最大限制时,使用LRU算法进行回收
  3. 基于读/写自定义过期策略
  4. 缓存的keys自动用弱引用封装,利于GC
  5. key被回收或移除可收到通知
  6. 缓存的访问统计

数据结构

核心类:

CacheBuilder:缓存构造器及入口,可配置缓存参数,build模式;
CacheLoader:抽象类,用于向数据源获取数据,定义了load,reload,loadAll等操作;
Cache:接口,定义数据的get、put等操作
LoadingCache:接口,继承Cache,定义get、getUnchecked、getAll等操作;
LocalCache:缓存核心类,包括了缓存的数据结构和基本实现;
LocalManualCache:LocalCache内部静态类,实现Cache接口。内部的增删改查实际由LocalCache实现;

数据结构图

image.png

读写队列

    // 最近访问过的数据会在此队列更新,使用ConcurrentLinkedQueue数据结构,此队列记录 get 命中操作的,多线程操作
    final Queue<ReferenceEntry<K, V>> recencyQueue;

    // 发生写操作后会将entry加入队列的尾部,以写入时间排序
    @GuardedBy("this")
    final Queue<ReferenceEntry<K, V>> writeQueue;

   // 发生读操作后会将entry加入队列的尾部,以访问时间排序
    @GuardedBy("this")
    final Queue<ReferenceEntry<K, V>> accessQueue;

recencyQueue =
          // 有设置expireAfterAccess或最大容量则为true
          map.usesAccessQueue()
              ? new ConcurrentLinkedQueue<ReferenceEntry<K, V>>()
              : LocalCache.<ReferenceEntry<K, V>>discardingQueue();

      writeQueue =
          // 有设置expireAfterWrite则为true
          map.usesWriteQueue()
              ? new WriteQueue<K, V>()
              : LocalCache.<ReferenceEntry<K, V>>discardingQueue();

      accessQueue =
          map.usesAccessQueue()
              ? new AccessQueue<K, V>()
              : LocalCache.<ReferenceEntry<K, V>>discardingQueue();

源码分析

CacheBuilder构造器

private static final LoadingCache<String, String> cache = CacheBuilder.newBuilder()
            .maximumSize(10)
            .refreshAfterWrite(10, TimeUnit.SECONDS)
            //.expireAfterWrite(1, TimeUnit.SECONDS)
            //.expireAfterAccess(1,TimeUnit.SECONDS)
            .build(new CacheLoader<String, String>() {
                @Override
                public String load(String key) throws Exception {
                    String value = key + ":load-" + new Random().nextInt(10);
                    return value;
                }

            });

LocalCache.get方法

V get(K key, CacheLoader<? super K, V> loader) throws ExecutionException {
    // hash以及分段和并发hashmap差不多,就不多说了

    int hash = hash(checkNotNull(key));
    return segmentFor(hash).get(key, hash, loader);
  }

V get(K key, int hash, CacheLoader<? super K, V> loader) throws ExecutionException {
      checkNotNull(key);
      checkNotNull(loader);
      try {
        if (count != 0) { // read-volatile
          // 获取能匹配到的entry
          ReferenceEntry<K, V> e = getEntry(key, hash);
          if (e != null) {
            long now = map.ticker.read();
            // 获取该entry可用的值,即key和value不为null,且在有效期内
            V value = getLiveValue(e, now);
            if (value != null) {
              //更新最近访问队列,若有读有效期,同时更新读时间
              recordRead(e, now);
              statsCounter.recordHits(1);
              //这里会再判断是否刷新取最新的值,否,则返回旧值即value,是则刷新重新reload新值
              return scheduleRefresh(e, key, hash, value, now, loader);
            }
            ValueReference<K, V> valueReference = e.getValueReference();
            // 若value=null且正在加载数据中,则等待(最终会调用future.get)
            if (valueReference.isLoading()) {
              return waitForLoadingValue(e, key, valueReference);
            }
          }
        }

        // at this point e is either null or expired;
        return lockedGetOrLoad(key, hash, loader);
      } catch (ExecutionException ee) {
        //略
      } finally {
        postReadCleanup();
      }
    }

V lockedGetOrLoad(K key, int hash, CacheLoader<? super K, V> loader) throws ExecutionException {
      ReferenceEntry<K, V> e;
      ValueReference<K, V> valueReference = null;
      LoadingValueReference<K, V> loadingValueReference = null;
      boolean createNewEntry = true;
      // 内部类Segment继承了ReentrantLock
      lock();
      try {
        // re-read ticker once inside the lock
        long now = map.ticker.read();
        //预清理,包括reference类型的key和value处理、key的读写有效期处理,会有加锁操作
        preWriteCleanup(now);

        int newCount = this.count - 1;
        AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;
        int index = hash & (table.length() - 1);
        // 获取index所在的第一个链表结点
        ReferenceEntry<K, V> first = table.get(index);
        // 遍历链表
        for (e = first; e != null; e = e.getNext()) {
          K entryKey = e.getKey();
          if (e.getHash() == hash
              && entryKey != null
              && map.keyEquivalence.equivalent(key, entryKey)) {
            valueReference = e.getValueReference();
            // 如果正有其他线程在加载要查询的值,则createNewEntry = false
            if (valueReference.isLoading()) {
              createNewEntry = false;
            } else {
              V value = valueReference.get();
              // value==null表示值为空,需要移除
              if (value == null) {
                enqueueNotification(
                    entryKey, hash, value, valueReference.getWeight(), RemovalCause.COLLECTED);
              } else if (map.isExpired(e, now)) {
                // This is a duplicate check, as preWriteCleanup already purged expired
                // entries, but let's accommodate an incorrect expiration queue.
               // 双重检查是否过期
                enqueueNotification(
                    entryKey, hash, value, valueReference.getWeight(), RemovalCause.EXPIRED);
              } else {
                // 记录本次读
                recordLockedRead(e, now);
                statsCounter.recordHits(1);
                // 返回值
                return value;
              }
               //若查询出来的节点为空或过期,则从读写队列中移除 
              // immediately reuse invalid entries
              writeQueue.remove(e);
              accessQueue.remove(e);
              this.count = newCount; // write-volatile
            }
            break;
          }
        }
        
        //createNewEntry=true表示需要创建新结点
        if (createNewEntry) {
          loadingValueReference = new LoadingValueReference<>();
          // 创建结点loadingValueReference,此时真正的值还未加载
          if (e == null) {
            e = newEntry(key, hash, first);
            e.setValueReference(loadingValueReference);
            table.set(index, e);
          } else {
            e.setValueReference(loadingValueReference);
          }
        }
      } finally {
        unlock();
        // 处理删除通知队列数据
        postWriteCleanup();
      }
      //createNewEntry=true表示需要创建新结点
      if (createNewEntry) {
        try {
          // Synchronizes on the entry to allow failing fast when a recursive load is
          // detected. This may be circumvented when an entry is copied, but will fail fast most
          // of the time.
          synchronized (e) {
            return loadSync(key, hash, loadingValueReference, loader);
          }
        } finally {
          statsCounter.recordMisses(1);
        }
      } else {
        // The entry already exists. Wait for loading.
        return waitForLoadingValue(e, key, valueReference);
      }
    }

V loadSync(
        K key,
        int hash,
        LoadingValueReference<K, V> loadingValueReference,
        CacheLoader<? super K, V> loader)
        throws ExecutionException {
      // 从loadingValueReference获取数据,返回为ListenableFuture
      ListenableFuture<V> loadingFuture = loadingValueReference.loadFuture(key, loader);
      // 获取数据并统计记录状态
      return getAndRecordStats(key, hash, loadingValueReference, loadingFuture);
    }
// 此方法在scheduleRefresh方法中也会调用
public ListenableFuture<V> loadFuture(K key, CacheLoader<? super K, V> loader) {
      try {
        stopwatch.start();
        // 获取旧值,像第一次获取不存在的数据时,此值为空
        V previousValue = oldValue.get();
        if (previousValue == null) {
          // 加载数据,最终会调用自定义的加载方法
          V newValue = loader.load(key);
          return set(newValue) ? futureValue : Futures.immediateFuture(newValue);
        }
        // 调用reload方法
        ListenableFuture<V> newValue = loader.reload(key, previousValue);
        if (newValue == null) {
          return Futures.immediateFuture(null);
        }
        // 将新值写入LoadingValueReference中
        return transform(
            newValue,
            new com.google.common.base.Function<V, V>() {
              @Override
              public V apply(V newValue) {
                LoadingValueReference.this.set(newValue);
                return newValue;
              }
            },
            directExecutor());
      } catch (Throwable t) {
        // 略
      }
    }
// getAndRecordStats方法最终会调用storeLoadedValue
boolean storeLoadedValue(
        K key, int hash, LoadingValueReference<K, V> oldValueReference, V newValue) {
      lock();
      try {
        long now = map.ticker.read();
        preWriteCleanup(now);

        int newCount = this.count + 1;
        // 如果segment中数据的长度大于阈值,则扩容
        if (newCount > this.threshold) { // ensure capacity
          expand();
          newCount = this.count + 1;
        }

        AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;
        int index = hash & (table.length() - 1);
        ReferenceEntry<K, V> first = table.get(index);

        for (ReferenceEntry<K, V> e = first; e != null; e = e.getNext()) {
          K entryKey = e.getKey();
          if (e.getHash() == hash
              && entryKey != null
              && map.keyEquivalence.equivalent(key, entryKey)) {
            ValueReference<K, V> valueReference = e.getValueReference();
            V entryValue = valueReference.get();
            // replace the old LoadingValueReference if it's live, otherwise
            // perform a putIfAbsent
            if (oldValueReference == valueReference
                || (entryValue == null && valueReference != UNSET)) {
              ++modCount;
              // 旧值是活跃的,则加入删除通知队列中
              if (oldValueReference.isActive()) {
                RemovalCause cause =
                    (entryValue == null) ? RemovalCause.COLLECTED : RemovalCause.REPLACED;
                enqueueNotification(key, hash, entryValue, oldValueReference.getWeight(), cause);
                newCount--;
              }
              setValue(e, key, newValue, now);
              this.count = newCount; // write-volatile
              // 判断是否需要驱逐,其中e为最新的值
              evictEntries(e);
              return true;
            }

            // the loaded value was already clobbered
            enqueueNotification(key, hash, newValue, 0, RemovalCause.REPLACED);
            return false;
          }
        }
        // 链表为空时的处理
        ++modCount;
        ReferenceEntry<K, V> newEntry = newEntry(key, hash, first);
        setValue(newEntry, key, newValue, now);
        table.set(index, newEntry);
        this.count = newCount; // write-volatile
        evictEntries(newEntry);
        return true;
      } finally {
        unlock();
        postWriteCleanup();
      }
    }

整个get过程挺复杂的,一个个来说吧。假设想要get的key在cache中不存在,那么步骤是这样的:

  1. 根据key的hash找到segment;
  2. 根据key的hash找到table中的下标index;
  3. 若index所在链表为空,则新建一个结点(此时结点是loadingValueReference,还没有加载数据),并放入table中;
  4. 加锁调用loadSync方法,最终会调用LoadingValueReference#loadFuture方法;
  5. 由于该key是第一次加载,旧值为空,直接调用load方法加载新值到LoadingValueReference中;
  6. 记录本次操作到读和写队列中,判断是否超出容量以及相关逐出操作,然后返回key对应的新值。

如果get的key在cache中已存在,步骤如下:
前几个找key的步骤一样

  1. 获取key所对应的ReferenceEntry,并判断是否有效(getLiveValue(e, now));
  2. 若有效,则调用scheduleRefresh方法,此方法会再判断是否过期 && 是否正在加载,否则直接返回旧值,是则刷新重新load新值;
  3. 若无效,判断是否正在加载,是则等待新值加载完毕后返回新值。

容量限制

guava是使用accessQueue、recencyQueue队列来实现LRU,其中recencyQueue底层使用的是java的ConcurrentLinkedQueue,按访问顺序进行排序。accessQueue和writeQueue是自定义实现,非并发。
执行因容量限制而逐出的代码:

void evictEntries(ReferenceEntry<K, V> newest) {
      if (!map.evictsBySize()) {
        return;
      }
      //批量将recencyQueue队列数据放入accessQueue
      drainRecencyQueue();

      // 当valueReference的权重大于这个maxSegmentWeight,则这个权重太大,已经超过段允许的最大权重了,直接驱逐这个新值。这里maxSegmentWeight = maxWeight / segmentCount (如果maxWeight % segmentCount大于段的数量,则maxSegmentWeight再加1,见初始化代码)。
      if (newest.getValueReference().getWeight() > maxSegmentWeight) {
        if (!removeEntry(newest, newest.getHash(), RemovalCause.SIZE)) {
          throw new AssertionError();
        }
      }
      //当段的总权重大于maxSegmentWeight,则根据accessQueue队列的数据进行逐出直到满足条件为止
      while (totalWeight > maxSegmentWeight) {
        ReferenceEntry<K, V> e = getNextEvictable();
        if (!removeEntry(e, e.getHash(), RemovalCause.SIZE)) {
          throw new AssertionError();
        }
      }
    }

这个的权重大家理解为1即可,初始化时maxWeight=maximumSize,然后会对每个段分配最大的maxSegmentWeight,如果某个段数据大小>maxSegmentWeight,则按LRU进行驱逐。所以这里不是严格的按maximumSize来驱逐的。
guavacache支持按权重进行设置大小,可以对不同的key进行设置不同的权重,比如key=a权重为1,key=b权重为2。maximumWeight和weigher配合一起使用。

这里recencyQueue的作用是缓存最近访问的get命中操作的key,然后若某个线程获取了段的锁,会批量将recencyQueue刷到非线程安全的accessQueue中,最终用accessQueue来判断最近访问的key。相比用一个读队列提升?
writeQueue是记录写操作key,当设置了expireAfterWrite时生效。如果一个获取一个key已经写过期了,那么会在加载前从writeQueue中移除。

以accessQueue为例

static final class AccessQueue<K, V> extends AbstractQueue<ReferenceEntry<K, V>> {
    final ReferenceEntry<K, V> head =
        new AbstractReferenceEntry<K, V>() {

          @Override
          public long getAccessTime() {
            return Long.MAX_VALUE;
          }

          @Override
          public void setAccessTime(long time) {}

          ReferenceEntry<K, V> nextAccess = this;

          @Override
          public ReferenceEntry<K, V> getNextInAccessQueue() {
            return nextAccess;
          }

          @Override
          public void setNextInAccessQueue(ReferenceEntry<K, V> next) {
            this.nextAccess = next;
          }

          ReferenceEntry<K, V> previousAccess = this;

          @Override
          public ReferenceEntry<K, V> getPreviousInAccessQueue() {
            return previousAccess;
          }

          @Override
          public void setPreviousInAccessQueue(ReferenceEntry<K, V> previous) {
            this.previousAccess = previous;
          }
        };

    // implements Queue

    @Override
    public boolean offer(ReferenceEntry<K, V> entry) {
      // unlink
      connectAccessOrder(entry.getPreviousInAccessQueue(), entry.getNextInAccessQueue());

      // add to tail
      connectAccessOrder(head.getPreviousInAccessQueue(), entry);
      connectAccessOrder(entry, head);

      return true;
    }
// 其他方法略

重点看下offer方法,主要做以下事情:
1.将Entry和它的前节点后节点的关联断开,需要Entry中维护它的前向和后向引用。
2.将新增加的节点加入到队列的尾部,寻找尾节点用了head.getPreviousInAccessQueue()。
3.将新增加的节点设为尾部节点。
这样最近更新的节点一定在尾部,逐出时只需删除前面的节点即可。
writeQueue同理

refreshAfterWrite和expireAfterWrite区别

主要区别在于当一个key过期时,refreshAfterWrite会有一个最新访问的线程进行加载新值,而其他后来访问的线程在加载完毕前直接返回旧值,因此refreshAfterWrite是不能严格保证key在指定时间后过期的;而expireAfterWrite在加载新值返回前,其他线程会等待直到新值加载完毕,并且expireAfterWrite会记录writeQueue,因此性能较差。

writeQueue用处

LRU使用的是accessQueue、recencyQueue,而writeQueue在设置了expireAfterWrite后使用,虽然在写和清理操作中维护了writeQueue,但感觉不是必要的,因为完全可以在判断expireAfterWrite过期后,直接删除旧值。而实际实现在判断key过期后,会遍历writeQueue,将所有过期的都删除,这样性能感觉不会更高,难道是省空间?可未免代价有点大。

引用类型的使用

这个暂不写了,有空再说

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,992评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,212评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,535评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,197评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,310评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,383评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,409评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,191评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,621评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,910评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,084评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,763评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,403评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,083评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,318评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,946评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,967评论 2 351