1、Redis 的过期键是如何删除的?
按官方的解释,有主动和被动两种策略
策略 | 优势 | 劣势 |
---|---|---|
主动删除 | 减少了对CPU和内存的影响 | 难以确定操作执行的时长和频率 |
被动删除 | CPU友好 | 内存不友好 |
- redis删除过期键采用了惰性删除和定期删除相结合的策略,惰性删除则是在每次GET/SET操作时去删,定期删除,则是在时间事件中,从整个key空间随机取样,直到过期键比率小于25%,如果同时有大量key过期的话,极可能导致主线程阻塞。一般可以通过做散列来优化处理。
- 在每一个定期删除循环中,Redis 会遍历 DB。如果这个 DB 完全没有设置了过期时间的 key,那就直接跳过。否则就针对这个 DB 抽一批 key,如果 key 已经过期,就直接删除。 如果在这一批 key 里面,过期的比例太低,那么就会中断循环,遍历下一个 DB。如果执行时间超过了阈值,也会中断。不过这个中断是整个中断,下一次定期删除的时候会从当前 DB 的下一个继续遍历。 总的来说,Redis 是通过控制执行定期删除循环时间来控制开销,这样可以在服务正常请求和清理过期 key 之间取得平衡。
- 具体参考另一篇详细介绍:Redis 的过期键是如何删除的
2、Redis 的淘汰策略有哪些?
当Redis的内存空间已经用满时,Redis将根据配置的淘汰策略(maxmemory-policy),进行相应的动作。某司Redis的淘汰策略共分为以下六种,默认no-eviction:
- no-eviction:不删除策略,当达到最大内存限制时,如果还需要更多的内存:直接返回错误。
- allkeys-lru:当达到最大内存限制时,如果还需要更多的内存:在所有的key中,挑选最近最少使用(LRU)的key淘汰
- volatile-lru:当达到最大内存限制时,如果还需要更多的内存:在设置了expire(过期时间)的key中,挑选最近最少使用(LRU)的key淘汰
- allkeys-random:当达到最大内存限制时,如果还需要更多的内存:在所有的key中,随机淘汰部分key
- volatile-random:当达到最大内存限制时,如果还需要更多的内存:在设置了expire(过期时间)的key中,随机淘汰部分key
- volatile-ttl:当达到最大内存限制时,如果还需要更多的内存:在设置了expire(过期时间)的key中,挑选TTL(time to live,剩余时间)短的key淘汰
3、常见的缓存模式有哪些,优缺点?
3.1 Cache Aside
这种模式通常是平时应用最广泛的一种模式,没有单独的缓存维护组件,缓存和db的读写操作由应用方负责,对于读写请求分别为请求读:先读缓存,若命中则返回。若没有命中,从数据库中查询数据写入缓存并返回请求写:先更新数据库,然后将缓存中的数据失效掉(注意是失效而不是更新)
通常在应用中,写缓存和写入数据库是两个独立的事务,选择先更新缓存还是先更新数据库在高并发的情况下,都有可能会产生数据不一致,如以下情况,注:抛开因为如写数据库失败或写缓存失败造成不一致的因素。
问题1:为什么不是先删缓存,再更新数据库?
回答:这种情况,当同时2个并发的读和写请求容易导致脏数据。试想同时有读写2个请求。1. 写请求A首先删除了缓存,并删除成功,这时还未开始更新数据库。2. 读请求B查询缓存未命中,然后查询数据库,查询出了旧数据并将旧数据写入缓存。3. 写请求A继续将新数据写入数据库。4. 此时缓存中的数据就出现了不一致,并且一直脏下去。
问题2:为什么更新操作是将cache失效,而不是更新?
回答:这种情况会造成2方面的问题,
1.同时2个并发的写请求时可能会导致脏数据。2.违背数据懒加载。
- 同时2个并发的写请求时可能会导致脏数据
- 写请求A先更新了数据库。
- 之后写请求B成功更新了数据库,并成功更新了缓存。
- 写请求A最后更新了缓存,此时写请求A的数据已经是脏数据,造成了不一致,并且会一致脏下去。
- 违背数据懒加载,避免不必要的计算消耗:有些缓存值是需要经过复杂的计算才能得出,如果每次更新数据的时候都更新缓存,但是后续在一段时间内并没有读取该缓存数据,这样就白白浪费了大量的计算性能,完全可以后续由读请求的时候,再去计算即可,这样更符合数据懒加载,降低计算开销。
Cache Aside Pattern模式也会出现不一致的问题实际上先更新db,再失效cache这种模式理论上也可能出现问题,只是相对于以上的更新顺序,出现不一致的几率会更小。
- 读请求A首先读取缓存未命中,这个时候去读数据库成功查询到数据。
- 写请求B进来更新数据库成功,并删除缓存的数据成功。
- 最后请求A再将查询的数据写入到缓存中,而此时请求A写入的数据已经是脏数据,造成了数据不一致。
之所以建议用这种更新顺序,因为理论上造成不一致的几率会比较小,要达到不一致需要读请求要先与写请求查询,然后后与写请求返回,通常来说数据库的查询的耗时会小于数据库写入的耗时,所以这种问题出现概率会比较小。
3.2 Read/Write Through Pattern
Cache Aside Pattern模式中由应用方维护数据库和缓存的读写,导致应用方数据库和缓存的维护设计侵入代码,数据层的耦合增大,代码复杂性增加。而Read/Write Through Pattern模式弥补了这一问题,调用方无需管理缓存和数据库调用,通过在设计中多抽象出一层缓存管理组件来负责和缓存和数据库读写维护,并且缓存和数据库的读写维护是同步的。调用方直接和缓存管理组件打交道,缓存和数据库对调用方是透明的视为一个整体。通过分离出缓存管理组件,解耦业务代码。
Read Through:应用向缓存管理组件发送查询请求,由缓存管理组件查询缓存,若缓存未命中,查询数据库,并将查询的数据写入缓存,并返回给应用。
Write Through:Write Through 套路和Read Through相仿,当更新数据的时候,将请求发送给缓存管理组件,由缓存管理组件同步更数据库和缓存数据。
3.3 Write Behind Caching Pattern
Write Behind模式和Write Through模式整个架构是一样的,最核心的一点在于write through在缓存数据库中的更新是同步的,而Write Behind是异步的。每次的请求写都是直接更新缓存然后就成功返回,并没有同步把数据更新到数据库。而把更新到数据库的过程称为flush,触发flush的条件可自定义,如定时或达到一定容量阈值时进行flush操作。并且可以实现批量写,合并写等策略,也有效减少了更新数据的频率,这种模式最大的好处就是读写响应非常快,吞吐量也会明显提升,因为都是跟cache交互。当然这种模式也有其他的问题。例如:数据不是强一致性的,因为选择了把最新的数据放在缓存里,如果缓存在flush到数据库之前宕机了就会丢失数据,另外实现也是最复杂的。
几种模式的优缺点:
模式 | 优点 | 缺点 |
---|---|---|
Cache Aside | 1.实现简单 | 1.需要调用方维护缓存和db的更新逻辑 2.代码侵入大 |
Read/Write Through | 1.引入缓存管理组件,缓存和数据库的维护对应用方式透明的 2.应用代码入侵小,逻辑更清晰 |
1.引入缓存管理组件,实现更复杂 |
Write Behind Caching | 1.读写直接和缓存打交道,异步批量更新数据库,性能最好 2.缓存和数据库对应用方透明 |
1.实现最复杂 2.数据丢失的风险 3.一致性最弱 |
4、如何保证缓存一致性问题?
- 不一致的原因:部分失败或并发导致的
- 互联网场景下,一般追求的最终一致性
- 方案 1:重试机制
- 方案 2:重试+binlog
通过引入Canal和缓存管理组件,将缓存更新的维护和业务代码解耦。另一个原因是,现在的数据库通常是主从架构来提升整体的查询qps,因数据库主从同步的延迟,删除缓存后,如果此时从数据库还未同步完成,新来的请求发现缓存失效了,从从库里查询了已经过期的数据放到缓存中,也会造成数据的不一致。而通过订阅binlog的同步的延迟性,使删除缓存的时序延后,进一步降低不一致的几率。
5、如何解决缓存穿透、击穿、雪崩、大Key、热Key问题?
5.1 缓存穿透
概念:如果大量的非法请求都去查询压根数据库中根本就不存在的数据,也就是缓存和数据库都查询不到这条数据,但是请求每次都会打到数据库上面去,缓存就形同虚设,缓存命中率为0,这种情况我们称之为缓存穿透。
解决方案:
- 业务非法参数校验
在上层业务上做非法参数校验,尽量避免非法参数的请求case打到cache层。 - 缓存空对象
因为每次查缓存都不存在,然后回溯到db去查询也不存在。因此可以把这种不存在的key也缓存起来,设置标识空的标识值,如“##”,那么就无法穿透到db层,但是要记到设置过期时间。这种方式的好处在于实现简单,但是会占用缓存空间,如果空数据的命中率不高,而且遇到的比较多非法请求时,会增加缓存空间的压力。
public Object getCache(final String key) {
Object value = redis.get(key);
if (value != null) {
if (value.equals("##")) {
return null;
}
return value;
}
Object valueFromDb = getValueFromDb(key);
if (value == null) {
valueFromDb = "##"; //"##"缓存标识为空
}
redis.set(key, valueFromDb, t);
return valueFromDb;
}
-
布隆过滤器
在缓存前加一层布隆过滤器,利用布隆过滤器bitset存储结构存储数据库中所有值,查询缓存前,先查询布隆过滤器,若一定不存在就返回,不用再回溯流量到缓存服务,过程如下:
private final BloomFilter<String> bloomFilter =
BloomFilter.create(Funnels.stringFunnel(Charsets.UTF_8), 1024 * 1024 * 32);
//查询布隆过滤器中是否存在
public boolean contains(String cacheKey) {
if (StringUtils.isEmpty(cacheKey)) {
return true;
}
boolean exists = bloomFilter.mightContain(cacheKey);
if (!exists) {
bloomFilter.put(cacheKey);
}
return exists;
}
//模拟初始化布隆过滤器,从db填充所有数据
public void initBF() {
int offset = 0;
int limit = 200;
while (true) {
List<String> dataFromDb = listFromDb(offset, limit);
if (CollectionUtils.isEmpty(dataFromDb)) {
break;
}
for (String s : dataFromDb) {
bloomFilter.put(s);
}
offset += limit;
}
}
5.2 缓存击穿
- 概念:当某一个热点key失效的时候,很多请求这一时间都查不到缓存,然后全部请求并发打到了数据库去查询数据构建缓存,造成数据库压力非常大甚至宕机。
- 解决方案
1.互斥锁:
因为是同一时间很多请求并发的访问数据库,把这个动作设置一个分布式锁,只有一个请求能去db访问,其他请求重试等待,解决了全部请求全部查询数据库的问题。这种方案相当于把数据库的访问压力转到了分布式锁的压力上来,有一定的弊端,但是最简单实用。
public Object getCache(final String key) {
Object value = redis.get(key);
//缓存值过期
if (value == null) {
//加mutexKey的互斥锁
if (redis.setnx(mutexKey, 1, time)) {
value = db.get(key);
redis.set(key, value, time);
redis.delete(mutexKey);
} else {
sleep(50);
//重试
return get(key);
}
}
return value;
}
2.软过期+互斥锁:
软过期指对缓存的值里存储逻辑过期时间t1,这个时间比实际要过期的时间t2小(t1<t2),业务取值时候,校验t1是否过期,在发现了数据逻辑时间过期的时候,也是引入一把互斥锁,首先将t1时间延长t1=t1+t并设置到缓存中去,接着去db查询新数据,其他线程这时看到延长了的过期时间,就会继续使用旧数据,等线程获取最新数据后再更新缓存。
这种方案相比第一种进一步减少了读请求线程阻塞的时间,第一种方案阻塞时间block time从数据库查询并设置到缓存中的整个时间段。第二种方案阻塞时间block time:t1=t1+t并设置到缓存中的时间段。
public Object getCache(final String key) {
Object value = redis.get(key);
if (value != null) {
//检验缓存里的逻辑过期时间
if (value.getTimeout() <= currentTimeMillis()) {
if (redis.setnx(mutexKey,time)) {
//立即延长逻辑过期时间,减少阻塞时间
value.setTimeout(value.getTimeout() + t1);
redis.set(key, value, time);
value = db.get(key);
//获取最新db数据,并重新设置新的逻辑过期时间,覆盖旧数据
value.setTimeout(value.getTimeout() + t2);
redis.set(key, value, time);
redis.delete(mutexKey);
} else {
sleep(500);
get(key);
}
}
} else {
//缓存不存在的情况和上面一样
if (redis.setnx(mutexKey,time)) {
value = db.get(key);
redis.set(key, value,time1);
redis.delete(mutexKey);
} else {
sleep(500);
get(key);
}
}
return value;
}
3.静态数据:lazy expiration
这里静态数据的含义是指redis不set expire过期时间,对redis来说认为数据是不过期的是静态的但实际和上面的软过期是一样的,通过value里设置逻辑过期时间,再拿到值判断值过期之后,后台新起异步线程更新缓存,这种方式性能最好
public Object getCache(String key) {
Object value = redis.get(key);
if (value.getTimeout() <= System.currentTimeMillis()) {
// 另起一条线程异步更新缓存
executorService.execute(new Runnable() {
public void run() {
if (redis.setnx(mutexKey, "1")) {
redis.expire(mutexKey, 3 * 60);
String dbValue = db.get(key);
redis.set(key, dbValue);
redis.delete(mutexKey);
}
}
});
}
return value;
}
方法 | 优点 | 缺点 |
---|---|---|
互斥锁 | 1.简单易用 2.一致性保证 |
1.存在线程阻塞的风险 2.数据库访问的压力转到分布式锁上来 |
软过期+互斥锁 | 1.相比互斥锁方案,降低线程阻塞的时间 | 1.代码更复杂 2.逻辑过期时间会占用一定的内存空间 |
静态数据 | 1.数据不过期,异步构建性能最好 2.基本杜绝热点key重建问题 |
1.不能保证一致性 2.代码复杂性增加 3.逻辑过期时间会占用一定的内存空间 |
5.3 缓存雪崩
- 概念:缓存层挡在db层前面,抗住了非常多的流量,在分布式系统中,“everything will fails”,缓存作为一种资源,当cache crash后,流量集中涌入下层数据库,称之为缓存雪崩。造成这种问题通常有2种:
- 业务层面:大量的缓存key同时失效,失效请求全部回源到数据库,造成数据库压力过大崩溃。
- 系统层面:缓存服务宕机。
- 解决方案:
1.分散过期时间:业务层面的原因,主要是缓存key过期时间一致,造成同一时间,大量缓存key同时失效。针对这种问题的解决方案,主要是防止缓存在同一时间一期过期,如在设置的过期时间的基础上增加t1-t2的随机值,使缓存失效时间比较均匀
2.提前演练压测:提前做好系统的演练压测,发现性能瓶颈,预估合适的系统存储和计算容量。
3.cache高可用+后端数据库限流:1. 缓存作为一种系统资源,且通常充当关键路径关键资源,应尽可能提升缓存的可用性,如redis的sentinel和cluster机制等;2. 采用了双缓存热备份方案来进可能提升缓存资源的可用性;3. 后端数据库限流,缓存层宕机,流量集中打到数据库,会再次让db崩溃。为保护这种情况下的db,在db层加入限流。
5.4 热 Key
- 概念:用户的消费速度远远大于生产速度,例如电商平台上线某个热门促销商品,微博大量转发的热门新闻等,这些数据往往查询量非常大。其实缓存击穿也是一种热点key问题,但是这里要讨论的方面不一样,缓存击穿主要侧重的是热key失效后大量并发查询涌向数据库照成的压力,而这里的热key侧重的是热key的访问压力已经大到超过redis性能极限,相对于缓存击穿的热key,这里也可叫巨热数据。
分布式缓存组件,通常会进行分片切分,例如squirrel的cluster机制,查询某个key,会通过key的hash值计算出对应的slot,路由到某个分片的所属机器上。热key出现时,所有热点访问的请求都会路由到同一个redis server,该节点的负载严重加剧,并且这种现象通常不是马上加机器就能解决,因为同一个请求key还是会落到同一个新机器上,瓶颈依然存在。并且如果这个key还是大key ,甚至可能达到物理网卡极限,服务被打垮宕机,造成雪崩,成为系统瓶颈和风险。因此热点key会有以下问题。- 流量集中,达到物理网卡上限。
- 请求过多,缓存分片服务被打垮。
- 缓存分片打垮,重建再次被打垮,引起业务雪崩。
- 解决方案
1.多级缓存:本地缓存->Redis->DB
2.多副本:当发现某个热key的时候,增加热key所在节点的从副本,这种情况对读多写少的情况比较有效。但是也增加了多副本同步不一致的风险。
3.迁移热key:当发现某个slot里热key的时候,将该slot的单独迁移到新的节点,和集群其他节点隔离,避免影响集群节点其他业务。
5.5 大 Key
概念:业务场景中经常会有各种大value多value的情况, 比如:1. 单个string 类型 key 存的value很大, 超过 1MB。2. hash, set,zset,list 中存储过多的元素,超过 10K。3. 一个集群存储了上亿的key,key本身过多也带来了更多的空间占用(如无例外,文章中所提及的hash,set等数据结构均指redis中的数据结构)由于redis是单线程运行的,如果一次操作的value很大会对整个redis的响应时间造成负面影响,所以,业务上能拆则拆,下面举几个典型的分拆方案。
string 类型大key处理方式
1:该对象需要每次都整存整取
可以尝试将对象分拆成几个key-value
使用multiGet获取值,这样分拆的意义在于分拆单次操作的压力,将操作压力平摊到多个redis实例中,降低对单个redis的IO影响和 CPU 的影响
2:该对象每次只需要存取部分数据
以像第一种做法一样,分拆成几个key-value,也可以将这个存储在一个hash中,每个field代表一个具体的属性,
使用hget,hmget来获取部分的value,使用hset,hmset来更新部分属性集合类型大 key处理方式
类似于场景一种的第一个做法,可以将这些元素分拆。以hash为例,原先的正常存取流程是 hget(hashKey, field) ; hset(hashKey, field, value)
现在,固定一个桶的数量,比如 10000, 每次存取的时候,先在本地计算field的hash值,模除 10000, 确定了该field落在哪个key上。newHashKey = hashKey + ( hash(field) % 10000); hset (newHashKey, field, value) ; hget(newHashKey, field)
set, zset, list 也可以类似上述做法。
但有些不适合的场景,比如,要保证 lpop 的数据的确是最早push到list中去的,这个就需要一些附加的属性,或者是在 key的拼接上做一些工作(比如list按照时间来分拆)。
6、如何保证 Redis 高性能?
- Redis 的高性能源自两方面,一方面是 Redis 处理命令的时候,都是纯内存操作。另外一方面,在 Linux 系统上 Redis 采用了 epoll 和 Reactor 结合的 IO 模型,非常高效。
- 为了保证性能最好,Redis 使用的是基于 epoll 的 Reactor 模式。 Reactor 模式可以看成是一个分发器 + 一堆处理器。Reactor 模式会发起 epoll 之类的系统调用,如果是读写事件,那么就交给 Handler 处理;如果是连接事件,就交给 Acceptor 处理。
Redis 是单线程模型,所以 Reactor、Handler 和 Acceptor 其实都是这个线程。 整个过程是这样的: Redis 中的 Reactor 调用 epoll,拿到符合条件的文件描述符。假如说 Redis 拿到了可读写的描述符,就会执行对应的读写操作。如果 Redis 拿到了创建连接的文件描述符,就会完成连接的初始化,然后准备监听这个连接上的读写事件。
Redis 在 6.0 引入多线程,整个 Redis 在多线程模式下,可以看作是单线程 Reactor、单线程 Acceptor 和多线程 Handler 的 Reactor 模式。只不过 Redis 的主线程同时扮演了 Reactor 中分发事件的角色,也扮演了接收请求的角色。同时多线程 Handler 在 Redis 里面仅仅是读写数据,命令的执行还是依赖于主线程来进行的。
7、如何保证 Redis分布式锁的高可用和高性能?
参考:分布式系统互斥性与幂等性问题的分析与解决
Cerberus在分布式场景下提供互斥原语,能够实现对并发场景下共享“资源”的保护。分布式锁主要解决两类问题。
• 互斥保护:加锁是为了避免Race Condition导致逻辑错误。例如直接使用分布式锁实现防重,幂等机制。此时如果锁出现错误会引起严重后果,因此对锁的正确性要求高。
• 高效去重:加锁是为了避免不必要的重复处理。例如防止幂等任务被多个执行者抢占。此时对锁的正确性要求不高;
其适用的最常见业务场景是:
a. 避免资源抢占,资源抢占可能导致重复执行的问题。例如运行定时任务前加锁,处理结束后解锁。
b. 防止并发写入,并发更新可能导致ABA问题。例如分发优惠券时,更新余量前加锁避免超量发券。
c. 选主/降级服务,对某个Node加锁等效为该Node成为Master。
Cerberus如何处理高一致性要求的场景?
目前Cerberus实现了基于ZooKeeper的Lock-Engine,用于在一致性要求高的场景下提供分布式锁服务;
Cerberus如何处理高性能要求的场景?
目前Cerberus实现了基于Redis的Lock-Engine,用于处理优先考虑高吞吐量需求的业务场景。
锁使用 Demo:
@Autowired
private IDistributedLockManager distributedLockManager;
@Override
public void lock(String lockName) {
Lock lock = distributedLockManager.getReentrantLock(lockName);
String name = Thread.currentThread().getName();
// 获取锁
logger.info("Thread={} try to acquire lock", name);
lock.lock();
try {
// 处理任务
logger.info("Thread={} do something...", name);
} finally {
// 释放锁
lock.unlock();
logger.info("Thread={} unlocked", name);
}
}
@Override
public void tryLock(String lockName) {
Lock lock = distributedLockManager.getReentrantLock(lockName);
String name = Thread.currentThread().getName();
// 获取锁
logger.info("Thread={} try to acquire lock", name);
if (lock.tryLock()) {
try {
// 处理任务
logger.info("Thread={} do something...", name);
}finally {
lock.unlock();// 释放锁
logger.info("Thread={} unlocked", name);
}
} else {
// 如果不能获取锁,则直接做其他事情
logger.info("Thread={} do other things...", name);
}
}
@Override
public void tryLock(String lockName, long time, TimeUnit timeUnit) throws InterruptedException {
Lock lock = distributedLockManager.getReentrantLock(lockName);
String name = Thread.currentThread().getName();
// 获取锁
logger.info("Thread={} try to acquire lock", name);
if (lock.tryLock(time,timeUnit)) {
try {
// 处理任务
logger.info("Thread={} do something...", name);
}finally {
lock.unlock();// 释放锁
logger.info("Thread={} unlocked", name);
}
} else {
// 如果不能获取锁,则直接做其他事情
logger.info("Thread={} do other things...", name);
}
}
@Override
public void lockInterruptibly(String lockName) throws InterruptedException {
Lock lock = distributedLockManager.getReentrantLock(lockName);
String name = Thread.currentThread().getName();
// 获取锁
logger.info("Thread={} try to acquire lock", name);
lock.lockInterruptibly();
try {
// 处理任务
logger.info("Thread={} do something...", name);
} finally {
// 释放锁
lock.unlock();
logger.info("Thread={} unlocked", name);
}
}
接口说明:
package com.meituan.hotel.dlm.lock;
public interface Lock {
//阻塞接口,如果此线程无法获得锁会一直阻塞直到获得锁,不响应中断
void lock() throws CerberusDLMException;
//与lock()的区别在于lockInterruptibly()可以响应中断
void lockInterruptibly() throws InterruptedException, CerberusDLMException;
//非阻塞接口,如果此线程可以获得锁返回true并持有锁,否则返回false
boolean tryLock() throws CerberusDLMException;
/**
* 阻塞接口,如果此线程无法获得锁会一直阻塞,有3种情况结束阻塞
* 1. 超过timeout限制的时长仍未获得锁,返回false;
* 2. 此线程被中断,抛出InterruptedException异常;
* 3. timeout限制的时长内获取到锁,返回true;
*/
boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException, CerberusDLMException;
//解锁
void unlock();
//返回锁名
String getName();
}
public interface IDistributedLockManager {
// 获得相应的可重入锁
Lock getReentrantLock(String lockName);
Lock getReentrantLock(String lockName, int expireTime);
Lock getReentrantLock(String lockName, int expireTime, int retry);
Lock getNewMultiLock(Lock[] locks);
Lock getNewMultiLock(String[] locks);
Lock getNewMultiLock(String[] locks, int expireTime);
}
核心原理:
- 加锁
private RenewalWatchdog watchdog;
/**
* 线程本地变量, 用于锁重入检查
*/
private ThreadLocal<LockHolder> locks = new ThreadLocal<>();
public void lock() {
try {
// 锁重入检查
if (reentrant()) {
t.setStatus(Transaction.SUCCESS);
return;
}
// 自旋, 直到获取锁
while (true) {
try {
if (squirrelProcessor.add(key, uuid, expireTime, retry)) {
lockAtOutermost();
log.info("Try acquire lock success. key: {}, uuid: {}", key, uuid);
t.setStatus(Transaction.SUCCESS);
return;
} else {
log.debug("Try acquire lock failed. key: {}, uuid: {}", key, uuid);
}
} catch (Exception e) {
log.error("Redis try acquire lock failed, key: {}", key, e);
t.setStatus(e);
sliTx.setStatus(e);
throw new RuntimeException(e);
}
}
}
}
protected void lockAtOutermost() {
locks.set(new LockHolder(key.toString()));
watchdog.watch(this);
}
/**
* 锁重入检查
*
* @return 重入则返回 true; 否则返回 false
*/
private boolean reentrant() {
try {
if (LockHolder.checkReentrancy(locks)) {
// reentrant, refresh lease time
// return squirrelProcessor.compareAndSet(key, this.uuid, this.uuid, expireTime, retry);
watchdog.watch(this);
return true;
} else {
return false;
}
} catch (Exception e) {
throw new RuntimeException("Redis refresh lease time failed, key: " + key, e);
}
}
public boolean add(StoreKey key, String value, int leaseTime, int retry) throws Exception {
try{
Boolean result;
try {
result = redisStoreClient.add(key, value, leaseTime);
} catch (Exception e) {
result = exceptionHandler(SquirrelMethodEnum.ADD, e, key, value, null, leaseTime, retry);
if (false == result) {
// 有可能客户端超时,但服务端已成功添加(key,value)
if (redisStoreClient.exists(key)) {
// 若key存在,获取value1,并判断value1是否与value相等,相等则返回true
String getValue = redisStoreClient.get(key);
t.setStatus(Transaction.SUCCESS);
return StringUtils.equals(value, getValue);
}
}
}
t.setStatus(Transaction.SUCCESS);
return result;
}
}
/**
* 处理squirrel的操作抛出的异常,根据重试次数进行回调
*
* @param type
* @param squirrelException
* @param key
* @param oldValue
* @param newValue
* @param expireTime
* @param retry
* @return
* @throws Exception
*/
private boolean exceptionHandler(SquirrelMethodEnum type, Exception squirrelException, StoreKey key, String oldValue, String newValue,
int expireTime, int retry) throws Exception {
if (Thread.interrupted())
throw new InterruptedException();
boolean result = false;
if (retry > 0) {
retry--;
Thread.sleep(100);
LOGGER.info(type.getField() + " error; Retry:" + (retry + 1) + "; Key:" + key + "; exception:" + squirrelException.toString());
switch (type) {
case ADD:
result = this.add(key, oldValue, expireTime, retry);
break;
case COMPARE_AND_DELETE:
result = this.compareAndDelete(key, oldValue, retry);
break;
case COMPARE_AND_SET:
result = this.compareAndSet(key, oldValue, newValue, expireTime, retry);
break;
default:
break;
}
} else {
LOGGER.error(type.getField() + " error; Key:" + key, squirrelException);
throw squirrelException;
}
return result;
}
@Override
public Boolean add(StoreKey key, final Object value, final int expire) {
return setnx(key, value, expire);
}
@Override
public Boolean setnx(StoreKey key, final Object value, final int expireInSeconds) {
final StoreCategoryConfig categoryConfig = categoryConfigManager.findCacheKeyType(key.getCategory());
final String finalKey = categoryConfig.getFinalKey(key);
return new MonitorCommand(new Method(Method.Command.WRITE, "setnx", finalKey).expire(expireInSeconds)
, storeType, categoryConfig) {
@Override
public Object excute() throws Exception {
byte[] str = transcoder.encodeToBytes(value);
if (expireInSeconds > 0) {
String result = clientManager.getClient().set(SafeEncoder.encode(finalKey), str, "NX", "EX", expireInSeconds);
return OK_STR.equals(result);
} else {
return 1 == clientManager.getClient().setnx(SafeEncoder.encode(finalKey), str);
}
}
}.run();
}
- 尝试加锁
public boolean tryLock() {
try{
// 锁重入检查
if (reentrant()) {
t.setStatus(Transaction.SUCCESS);
return true;
}
try {
if (squirrelProcessor.add(key, uuid, expireTime, retry)) {
lockAtOutermost();
log.info("Try acquire lock success. key: {}, uuid: {}", key, uuid);
t.setStatus(Transaction.SUCCESS);
return true;
} else {
log.debug("Try acquire lock failed. key: {}, uuid: {}", key, uuid);
t.setStatus(Transaction.SUCCESS);
return false;
}
} catch (Exception e) {
log.error("Redis try acquire lock failed, key: {}", key, e);
throw new RuntimeException(e);
}
}
}
/**
* 一直阻塞直到获取锁, 除非超过等待时间, 或者线程被中断;
* 获取锁后, 持有的锁超过指定的时间后自动过期
*
* @param waitTime 等待时间
* @param unit 时间单位
* @return 获取锁成功返回 true; 否则返回 false
* @throws InterruptedException 如果当前线程被中断, 抛出该异常
*/
@Override
public boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException {
try {
Preconditions.checkNotNull(unit, "时间单位不能为空");
// 锁重入检查
if (reentrant()) {
t.setStatus(Transaction.SUCCESS);
return true;
}
long timeout = System.nanoTime() + unit.toNanos(waitTime);
if (timeout < 0) {
timeout = Long.MAX_VALUE;
}
while (true) {
try {
if (squirrelProcessor.add(key, uuid, expireTime, retry)) {
lockAtOutermost();
log.info("Try acquire lock success. key: {}, uuid: {}", key, uuid);
t.setStatus(Transaction.SUCCESS);
return true;
}
} catch (InterruptedException e) {
t.setStatus("thread interrupted");
throw new InterruptedException();
} catch (Exception e) {
log.error("Redis try acquire lock failed, key: {}", key, e);
throw new RuntimeException(e);
}
if (System.nanoTime() >= timeout) {
log.debug("Try acquire lock timeout. key: {}, uuid: {}", key, uuid);
t.setStatus(Transaction.SUCCESS);
return false;
} else {
log.debug("Try acquire lock failed, will try again. key: {}, uuid: {}", key, uuid);
}
if (Thread.interrupted()) {
t.setStatus("thread interrupted");
throw new InterruptedException();
}
}
}
}
/**
* 一直阻塞到获得锁, 除非线程被中断;
* 获取锁后, 持有的锁超过指定的时间后自动过期
*
* @throws InterruptedException 如果当前线程被中断, 抛出该异常
*/
@Override
public void lockInterruptibly() throws InterruptedException {
tryLock(Long.MAX_VALUE, TimeUnit.DAYS);
}
- 解锁
public void unlock() {
try{
LockHolder lockHolder = this.locks.get();
if (lockHolder == null) {
t.setStatus(Transaction.SUCCESS);
throw new IllegalMonitorStateException("Attempting to unlock without first obtaining that lock on this thread");
}
int lockCounts = lockHolder.decrementLock();
try {
if (lockCounts == 0) {
// locks.remove();
unlockAtOutermost();
if (squirrelProcessor.compareAndDelete(key, uuid, retry)) {
log.info("Release lock success. key: {}, uuid: {}", key, uuid);
} else {
log.debug("Release lock failed. key: {}, uuid: {}", key, uuid);
}
}
} catch (Exception e) {
log.error("Redis try release lock failed. key: {}", key, e);
t.setStatus(e);
throw new RuntimeException(e);
}
t.setStatus(Transaction.SUCCESS);
}
}
public boolean compareAndDelete(StoreKey key, String value, int retry) throws Exception {
try{
Boolean result;
try {
result = redisStoreClient.compareAndDelete(key, value);
} catch (Exception e) {
result = exceptionHandler(SquirrelMethodEnum.COMPARE_AND_DELETE, e, key, value, null, -1, retry);
}
t.setStatus(Transaction.SUCCESS);
return result;
}
}
public Boolean compareAndDelete(StoreKey key, final Object expect) {
checkNotNull(key, STORE_KEY_IS_NULL);
checkNotNull(expect, STORE_VALUE_IS_NULL);
final StoreCategoryConfig categoryConfig = categoryConfigManager.findCacheKeyType(key.getCategory());
final String finalKey = categoryConfig.getFinalKey(key);
return new MonitorCommand(new Method(Method.Command.WRITE, "compareAndDelete", finalKey)
, storeType, categoryConfig) {
@Override
public Object excute() throws Exception {
return clientManager.getClient().compareAndDelete(SafeEncoder.encode(finalKey),
transcoder.encodeToBytes(expect));
}
}.run();
}
// jedis
public Boolean compareAndDelete(final byte[] key, final byte[] expect) {
return new JedisClusterCommand<Boolean>(connectionHandler, maxRedirections, Protocol.Command.SET) {
@Override
public Boolean execute(Jedis connection) {
return connection.cad(key, expect) == 1;
}
}.runBinary(key);
}
- 工具类:
public class LockHolder {
private final String lockNode;
private final AtomicInteger numLocks = new AtomicInteger(1);
private final int lockTime;
public LockHolder(String lockNode) {
this.lockNode = lockNode;
this.lockTime = -1;
}
public LockHolder(int lockTime) {
this.lockTime = lockTime;
this.lockNode = "";
}
public void incrementLock() {
numLocks.incrementAndGet();
}
public int decrementLock() {
return numLocks.decrementAndGet();
}
public String getLockNode() {
return lockNode;
}
public int getLockTime() {
return lockTime;
}
public static boolean checkReentrancy(ThreadLocal<LockHolder> locks) {
LockHolder local = locks.get();
if(local!=null){
local.incrementLock();
return true;
}
return false;
}
}
@Slf4j
public abstract class RenewalWatchdog<T extends Lock> {
protected static final long SHUT_DOWN_WAITING_MILI = 5000;
/**
* 刷新间隔, nanosecond
*/
protected long renewalIntervalNano;
/**
* 续租步长, nanosecond
*/
protected long renewalStepNano;
protected ScheduledExecutorService renewalScheduler = new ScheduledThreadPoolExecutor(1);
protected ThreadPoolExecutor renewalExecutor = createProcessorExecutor();
protected final Map<String, LockTimer<T>> LOCK_TIMER_MAP;
/**
* @param intervalNano nanosecond
* @param stepSizeNano nanosecond
* @param mapInitSize map init size
*/
public RenewalWatchdog(long intervalNano, long stepSizeNano, int mapInitSize) {
log.info("New RenewalWatchdog initiated. {}, {}, {}", intervalNano, stepSizeNano, mapInitSize);
this.renewalIntervalNano = intervalNano;
this.renewalStepNano = stepSizeNano;
this.LOCK_TIMER_MAP = new ConcurrentHashMap<>(mapInitSize);
renewalScheduler.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
if (CollectionUtils.isEmpty(LOCK_TIMER_MAP)) {
log.info("LOCK_TIMER_MAP empty, no lock need to be renewed.");
return;
}
int counter = 0;
for (LockTimer<T> item : LOCK_TIMER_MAP.values()) {
log.debug("trying to renew lock, timer: ", item);
CatUtils.logSquirrelWatchDogEvent("LOCK_Renewal");
if (renewal(item)) {
counter++;
}
}
log.info("{} locks' lease renewed.", counter);
} catch (Exception e) {
log.warn("renewal failed.", e);
}
}
}, this.renewalIntervalNano, this.renewalIntervalNano, TimeUnit.NANOSECONDS);
}
/**
* @param lock
* @return
*/
public void watch(T lock) {
log.debug("watch lock {}", lock);
LOCK_TIMER_MAP.put(lock.getName(), createLockTimer(lock));
}
public void unWatch(T lock) {
if (lock == null) {
log.warn("lock cannot be null.");
return;
}
log.debug("unwatch lock {}", lock);
LOCK_TIMER_MAP.remove(lock.getName());
}
public void destroy() {
renewalScheduler.shutdown();
renewalExecutor.shutdown();
try {
if (renewalScheduler.awaitTermination(SHUT_DOWN_WAITING_MILI, TimeUnit.MILLISECONDS)) {
renewalScheduler.shutdownNow();
renewalScheduler = null;
}
if (renewalExecutor.awaitTermination(SHUT_DOWN_WAITING_MILI, TimeUnit.MILLISECONDS)) {
renewalExecutor.shutdownNow();
renewalExecutor = null;
}
} catch (InterruptedException e) {
log.warn("scheduler.awaitTermination interrupted.");
} catch (Exception ex) {
log.warn("scheduler.awaitTermination error.", ex);
} finally {
LOCK_TIMER_MAP.clear();
}
log.info("RenewalWatchdog destroyed.");
}
/**
* threadpool can be refined
* @return
*/
protected ThreadPoolExecutor createProcessorExecutor() {
return new ThreadPoolExecutor(5, 10, 1000, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}
/**
*
*/
protected boolean renewal(LockTimer<T> timer) {
if (timer == null) {
return false;
}
if (timer.isExpired()) {
log.debug("unwatch expired lock, time: {}", timer);
//need remove timer.
unWatch(timer.getLock());
return false;
}
//put into renewal threadPool
renewalExecutor.execute(timer);
return true;
}
protected abstract LockTimer<T> createLockTimer(T lock);
}
- watchdog机制实现
1.解决的问题
直接设置Redis超时 | 实现watchdog续租机制(1.7.4-snapshot) |
---|---|
设置合理的超时时间比较困难 | 设置合理的超时时间比较简单 |
超时时间设置过长:存在client端失效时,死锁不能及时被释放的问题 | 可以直接设置较长超时时间,client失效后持有的锁会在短时间内被释放,不需要等到指定的超时时间后释放。 |
超时时间设置过短:存在锁被强制释放,导致并发访问的问题 |
2.实现方案
- SquirrelLockWatchdog负责管理squirrel引擎下所有lock的续租
- 每次获取到锁,向SquirrelLockWatchdog注册锁自身
- 每次锁超时,或者被从squirrel释放,在SquirrelLockWatchdog移除自身
- SquirrelLockWatchdog使用定时线程池,定期为每个锁续租
- 续租过程由专用线程池并发处理
- 续租参数如下:
- 续租的刷新间隔为500ms
- 每次续租步长为2s(所以锁的超时时间最小值为2s)
- 锁余下的超时时间不足时,续租步长为剩余时间(不足一秒时向上取整)
3.详细流程
8、如何利用缓存提高应用性能的?
- 多级缓存:本地缓存+Redis(定价策略、指标元数据等)
- 缓存预热+预加载:启动时加载热点数据、启动后逐步放流量加载(定价策略预热)
- 客户端缓存:针对依赖较慢且不经常变的接口,可做缓存(调价工作台与权限系统交互)