guavacache是google出的一个开源本地缓存库,适用于高并发场景。本文基于28.1-jre版本
基本特性
- 数据自动加载缓存中
- 缓存达到最大限制时,使用LRU算法进行回收
- 基于读/写自定义过期策略
- 缓存的keys自动用弱引用封装,利于GC
- key被回收或移除可收到通知
- 缓存的访问统计
数据结构
核心类:
CacheBuilder:缓存构造器及入口,可配置缓存参数,build模式;
CacheLoader:抽象类,用于向数据源获取数据,定义了load,reload,loadAll等操作;
Cache:接口,定义数据的get、put等操作
LoadingCache:接口,继承Cache,定义get、getUnchecked、getAll等操作;
LocalCache:缓存核心类,包括了缓存的数据结构和基本实现;
LocalManualCache:LocalCache内部静态类,实现Cache接口。内部的增删改查实际由LocalCache实现;
数据结构图
读写队列
// 最近访问过的数据会在此队列更新,使用ConcurrentLinkedQueue数据结构,此队列记录 get 命中操作的,多线程操作
final Queue<ReferenceEntry<K, V>> recencyQueue;
// 发生写操作后会将entry加入队列的尾部,以写入时间排序
@GuardedBy("this")
final Queue<ReferenceEntry<K, V>> writeQueue;
// 发生读操作后会将entry加入队列的尾部,以访问时间排序
@GuardedBy("this")
final Queue<ReferenceEntry<K, V>> accessQueue;
recencyQueue =
// 有设置expireAfterAccess或最大容量则为true
map.usesAccessQueue()
? new ConcurrentLinkedQueue<ReferenceEntry<K, V>>()
: LocalCache.<ReferenceEntry<K, V>>discardingQueue();
writeQueue =
// 有设置expireAfterWrite则为true
map.usesWriteQueue()
? new WriteQueue<K, V>()
: LocalCache.<ReferenceEntry<K, V>>discardingQueue();
accessQueue =
map.usesAccessQueue()
? new AccessQueue<K, V>()
: LocalCache.<ReferenceEntry<K, V>>discardingQueue();
源码分析
CacheBuilder构造器
private static final LoadingCache<String, String> cache = CacheBuilder.newBuilder()
.maximumSize(10)
.refreshAfterWrite(10, TimeUnit.SECONDS)
//.expireAfterWrite(1, TimeUnit.SECONDS)
//.expireAfterAccess(1,TimeUnit.SECONDS)
.build(new CacheLoader<String, String>() {
@Override
public String load(String key) throws Exception {
String value = key + ":load-" + new Random().nextInt(10);
return value;
}
});
LocalCache.get方法
V get(K key, CacheLoader<? super K, V> loader) throws ExecutionException {
// hash以及分段和并发hashmap差不多,就不多说了
int hash = hash(checkNotNull(key));
return segmentFor(hash).get(key, hash, loader);
}
V get(K key, int hash, CacheLoader<? super K, V> loader) throws ExecutionException {
checkNotNull(key);
checkNotNull(loader);
try {
if (count != 0) { // read-volatile
// 获取能匹配到的entry
ReferenceEntry<K, V> e = getEntry(key, hash);
if (e != null) {
long now = map.ticker.read();
// 获取该entry可用的值,即key和value不为null,且在有效期内
V value = getLiveValue(e, now);
if (value != null) {
//更新最近访问队列,若有读有效期,同时更新读时间
recordRead(e, now);
statsCounter.recordHits(1);
//这里会再判断是否刷新取最新的值,否,则返回旧值即value,是则刷新重新reload新值
return scheduleRefresh(e, key, hash, value, now, loader);
}
ValueReference<K, V> valueReference = e.getValueReference();
// 若value=null且正在加载数据中,则等待(最终会调用future.get)
if (valueReference.isLoading()) {
return waitForLoadingValue(e, key, valueReference);
}
}
}
// at this point e is either null or expired;
return lockedGetOrLoad(key, hash, loader);
} catch (ExecutionException ee) {
//略
} finally {
postReadCleanup();
}
}
V lockedGetOrLoad(K key, int hash, CacheLoader<? super K, V> loader) throws ExecutionException {
ReferenceEntry<K, V> e;
ValueReference<K, V> valueReference = null;
LoadingValueReference<K, V> loadingValueReference = null;
boolean createNewEntry = true;
// 内部类Segment继承了ReentrantLock
lock();
try {
// re-read ticker once inside the lock
long now = map.ticker.read();
//预清理,包括reference类型的key和value处理、key的读写有效期处理,会有加锁操作
preWriteCleanup(now);
int newCount = this.count - 1;
AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;
int index = hash & (table.length() - 1);
// 获取index所在的第一个链表结点
ReferenceEntry<K, V> first = table.get(index);
// 遍历链表
for (e = first; e != null; e = e.getNext()) {
K entryKey = e.getKey();
if (e.getHash() == hash
&& entryKey != null
&& map.keyEquivalence.equivalent(key, entryKey)) {
valueReference = e.getValueReference();
// 如果正有其他线程在加载要查询的值,则createNewEntry = false
if (valueReference.isLoading()) {
createNewEntry = false;
} else {
V value = valueReference.get();
// value==null表示值为空,需要移除
if (value == null) {
enqueueNotification(
entryKey, hash, value, valueReference.getWeight(), RemovalCause.COLLECTED);
} else if (map.isExpired(e, now)) {
// This is a duplicate check, as preWriteCleanup already purged expired
// entries, but let's accommodate an incorrect expiration queue.
// 双重检查是否过期
enqueueNotification(
entryKey, hash, value, valueReference.getWeight(), RemovalCause.EXPIRED);
} else {
// 记录本次读
recordLockedRead(e, now);
statsCounter.recordHits(1);
// 返回值
return value;
}
//若查询出来的节点为空或过期,则从读写队列中移除
// immediately reuse invalid entries
writeQueue.remove(e);
accessQueue.remove(e);
this.count = newCount; // write-volatile
}
break;
}
}
//createNewEntry=true表示需要创建新结点
if (createNewEntry) {
loadingValueReference = new LoadingValueReference<>();
// 创建结点loadingValueReference,此时真正的值还未加载
if (e == null) {
e = newEntry(key, hash, first);
e.setValueReference(loadingValueReference);
table.set(index, e);
} else {
e.setValueReference(loadingValueReference);
}
}
} finally {
unlock();
// 处理删除通知队列数据
postWriteCleanup();
}
//createNewEntry=true表示需要创建新结点
if (createNewEntry) {
try {
// Synchronizes on the entry to allow failing fast when a recursive load is
// detected. This may be circumvented when an entry is copied, but will fail fast most
// of the time.
synchronized (e) {
return loadSync(key, hash, loadingValueReference, loader);
}
} finally {
statsCounter.recordMisses(1);
}
} else {
// The entry already exists. Wait for loading.
return waitForLoadingValue(e, key, valueReference);
}
}
V loadSync(
K key,
int hash,
LoadingValueReference<K, V> loadingValueReference,
CacheLoader<? super K, V> loader)
throws ExecutionException {
// 从loadingValueReference获取数据,返回为ListenableFuture
ListenableFuture<V> loadingFuture = loadingValueReference.loadFuture(key, loader);
// 获取数据并统计记录状态
return getAndRecordStats(key, hash, loadingValueReference, loadingFuture);
}
// 此方法在scheduleRefresh方法中也会调用
public ListenableFuture<V> loadFuture(K key, CacheLoader<? super K, V> loader) {
try {
stopwatch.start();
// 获取旧值,像第一次获取不存在的数据时,此值为空
V previousValue = oldValue.get();
if (previousValue == null) {
// 加载数据,最终会调用自定义的加载方法
V newValue = loader.load(key);
return set(newValue) ? futureValue : Futures.immediateFuture(newValue);
}
// 调用reload方法
ListenableFuture<V> newValue = loader.reload(key, previousValue);
if (newValue == null) {
return Futures.immediateFuture(null);
}
// 将新值写入LoadingValueReference中
return transform(
newValue,
new com.google.common.base.Function<V, V>() {
@Override
public V apply(V newValue) {
LoadingValueReference.this.set(newValue);
return newValue;
}
},
directExecutor());
} catch (Throwable t) {
// 略
}
}
// getAndRecordStats方法最终会调用storeLoadedValue
boolean storeLoadedValue(
K key, int hash, LoadingValueReference<K, V> oldValueReference, V newValue) {
lock();
try {
long now = map.ticker.read();
preWriteCleanup(now);
int newCount = this.count + 1;
// 如果segment中数据的长度大于阈值,则扩容
if (newCount > this.threshold) { // ensure capacity
expand();
newCount = this.count + 1;
}
AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;
int index = hash & (table.length() - 1);
ReferenceEntry<K, V> first = table.get(index);
for (ReferenceEntry<K, V> e = first; e != null; e = e.getNext()) {
K entryKey = e.getKey();
if (e.getHash() == hash
&& entryKey != null
&& map.keyEquivalence.equivalent(key, entryKey)) {
ValueReference<K, V> valueReference = e.getValueReference();
V entryValue = valueReference.get();
// replace the old LoadingValueReference if it's live, otherwise
// perform a putIfAbsent
if (oldValueReference == valueReference
|| (entryValue == null && valueReference != UNSET)) {
++modCount;
// 旧值是活跃的,则加入删除通知队列中
if (oldValueReference.isActive()) {
RemovalCause cause =
(entryValue == null) ? RemovalCause.COLLECTED : RemovalCause.REPLACED;
enqueueNotification(key, hash, entryValue, oldValueReference.getWeight(), cause);
newCount--;
}
setValue(e, key, newValue, now);
this.count = newCount; // write-volatile
// 判断是否需要驱逐,其中e为最新的值
evictEntries(e);
return true;
}
// the loaded value was already clobbered
enqueueNotification(key, hash, newValue, 0, RemovalCause.REPLACED);
return false;
}
}
// 链表为空时的处理
++modCount;
ReferenceEntry<K, V> newEntry = newEntry(key, hash, first);
setValue(newEntry, key, newValue, now);
table.set(index, newEntry);
this.count = newCount; // write-volatile
evictEntries(newEntry);
return true;
} finally {
unlock();
postWriteCleanup();
}
}
整个get过程挺复杂的,一个个来说吧。假设想要get的key在cache中不存在,那么步骤是这样的:
- 根据key的hash找到segment;
- 根据key的hash找到table中的下标index;
- 若index所在链表为空,则新建一个结点(此时结点是loadingValueReference,还没有加载数据),并放入table中;
- 加锁调用loadSync方法,最终会调用LoadingValueReference#loadFuture方法;
- 由于该key是第一次加载,旧值为空,直接调用load方法加载新值到LoadingValueReference中;
- 记录本次操作到读和写队列中,判断是否超出容量以及相关逐出操作,然后返回key对应的新值。
如果get的key在cache中已存在,步骤如下:
前几个找key的步骤一样
- 获取key所对应的ReferenceEntry,并判断是否有效(getLiveValue(e, now));
- 若有效,则调用scheduleRefresh方法,此方法会再判断是否过期 && 是否正在加载,否则直接返回旧值,是则刷新重新load新值;
- 若无效,判断是否正在加载,是则等待新值加载完毕后返回新值。
容量限制
guava是使用accessQueue、recencyQueue队列来实现LRU,其中recencyQueue底层使用的是java的ConcurrentLinkedQueue,按访问顺序进行排序。accessQueue和writeQueue是自定义实现,非并发。
执行因容量限制而逐出的代码:
void evictEntries(ReferenceEntry<K, V> newest) {
if (!map.evictsBySize()) {
return;
}
//批量将recencyQueue队列数据放入accessQueue
drainRecencyQueue();
// 当valueReference的权重大于这个maxSegmentWeight,则这个权重太大,已经超过段允许的最大权重了,直接驱逐这个新值。这里maxSegmentWeight = maxWeight / segmentCount (如果maxWeight % segmentCount大于段的数量,则maxSegmentWeight再加1,见初始化代码)。
if (newest.getValueReference().getWeight() > maxSegmentWeight) {
if (!removeEntry(newest, newest.getHash(), RemovalCause.SIZE)) {
throw new AssertionError();
}
}
//当段的总权重大于maxSegmentWeight,则根据accessQueue队列的数据进行逐出直到满足条件为止
while (totalWeight > maxSegmentWeight) {
ReferenceEntry<K, V> e = getNextEvictable();
if (!removeEntry(e, e.getHash(), RemovalCause.SIZE)) {
throw new AssertionError();
}
}
}
这个的权重大家理解为1即可,初始化时maxWeight=maximumSize,然后会对每个段分配最大的maxSegmentWeight,如果某个段数据大小>maxSegmentWeight,则按LRU进行驱逐。所以这里不是严格的按maximumSize来驱逐的。
guavacache支持按权重进行设置大小,可以对不同的key进行设置不同的权重,比如key=a权重为1,key=b权重为2。maximumWeight和weigher配合一起使用。
这里recencyQueue的作用是缓存最近访问的get命中操作的key,然后若某个线程获取了段的锁,会批量将recencyQueue刷到非线程安全的accessQueue中,最终用accessQueue来判断最近访问的key。相比用一个读队列提升?
writeQueue是记录写操作key,当设置了expireAfterWrite时生效。如果一个获取一个key已经写过期了,那么会在加载前从writeQueue中移除。
以accessQueue为例
static final class AccessQueue<K, V> extends AbstractQueue<ReferenceEntry<K, V>> {
final ReferenceEntry<K, V> head =
new AbstractReferenceEntry<K, V>() {
@Override
public long getAccessTime() {
return Long.MAX_VALUE;
}
@Override
public void setAccessTime(long time) {}
ReferenceEntry<K, V> nextAccess = this;
@Override
public ReferenceEntry<K, V> getNextInAccessQueue() {
return nextAccess;
}
@Override
public void setNextInAccessQueue(ReferenceEntry<K, V> next) {
this.nextAccess = next;
}
ReferenceEntry<K, V> previousAccess = this;
@Override
public ReferenceEntry<K, V> getPreviousInAccessQueue() {
return previousAccess;
}
@Override
public void setPreviousInAccessQueue(ReferenceEntry<K, V> previous) {
this.previousAccess = previous;
}
};
// implements Queue
@Override
public boolean offer(ReferenceEntry<K, V> entry) {
// unlink
connectAccessOrder(entry.getPreviousInAccessQueue(), entry.getNextInAccessQueue());
// add to tail
connectAccessOrder(head.getPreviousInAccessQueue(), entry);
connectAccessOrder(entry, head);
return true;
}
// 其他方法略
重点看下offer方法,主要做以下事情:
1.将Entry和它的前节点后节点的关联断开,需要Entry中维护它的前向和后向引用。
2.将新增加的节点加入到队列的尾部,寻找尾节点用了head.getPreviousInAccessQueue()。
3.将新增加的节点设为尾部节点。
这样最近更新的节点一定在尾部,逐出时只需删除前面的节点即可。
writeQueue同理
refreshAfterWrite和expireAfterWrite区别
主要区别在于当一个key过期时,refreshAfterWrite会有一个最新访问的线程进行加载新值,而其他后来访问的线程在加载完毕前直接返回旧值,因此refreshAfterWrite是不能严格保证key在指定时间后过期的;而expireAfterWrite在加载新值返回前,其他线程会等待直到新值加载完毕,并且expireAfterWrite会记录writeQueue,因此性能较差。
writeQueue用处
LRU使用的是accessQueue、recencyQueue,而writeQueue在设置了expireAfterWrite后使用,虽然在写和清理操作中维护了writeQueue,但感觉不是必要的,因为完全可以在判断expireAfterWrite过期后,直接删除旧值。而实际实现在判断key过期后,会遍历writeQueue,将所有过期的都删除,这样性能感觉不会更高,难道是省空间?可未免代价有点大。
引用类型的使用
这个暂不写了,有空再说