@[toc]
一、缓存
1、缓存使用
为了系统性能的提升,我们一般都会将部分数据放入缓存中,加速访问。而db承担数据落盘工作。
哪些教据适合放入緩存?
即时性、数据一致性要求不高的
访问量大且更新频率不高的数据(读多,写少)
举例:电商类应用,商品分类,商品列表等适合缓存并加一个失效时间(根据数据更新频率来定),后台如果发布一个商品,买家需要5分钟才能看到新的商品一般还是可以接受的。
伪代码逻辑:
data = cache.load(id);//从缓存加载数据
if(data == null){
data = db.loadid);//从数据库加载数据
cache.put(id,data);//保存到cache中
}
retum data;
注意:在开发中,凡是放入缓存中的数据都应该指定过期时间,使其可以在系统即使没有主动更新数据也能自动触发数据加载进缓存的流程。避免业务崩溃导致的数据永久不一致题。
本地缓存:
适合单体应用
分布式缓存-本地模式在分布式下的问题:
缓存一致性问题、拓展性问题、高可用问题
分布式缓存:
可以解决前面两个的不足,目前最常使用的是redis
2、整合 redis 作为缓存
需要创建一个 Spring Boot 项目来整合 Redis。如果还没有安装 Redis,那么 Redis 的安装可以参考:在CentOS中安装和使用Docker 这篇内容。或者使用 Windows 版本的 Redis 也是可以的。
1、配置pom
文件
在 SpringBoot
项目的 pom
文件中引入 redis
依赖,可以不用写版本号,使用SpringBoot
的默认配置项:
<!-- 引入redis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
引入依赖之后,项目中就会有 RedisAutoConfiguration.java
自动配置类,可以进行 redis 的自动配置。 RedisAutoConfiguration.java
将 redis 的所有配置属性都放在 RedisProperties.java
类中。
2、配置application.yml
文件:
spring:
redis:
host: 192.168.56.10 # redis地址
port: 6379 # 端口号,默认为6379.相同的话也可以不配
3、测试 redis
在 RedisAutoConfiguration.java
类中,已经为我们提供了RedisTemplate<Object, Object>
和 StringRedisTemplate
两个类,来封装 redis 的操作,下面来使用 StringRedisTemplate
测试一下。
在测试类中添加下面的代码:
@Autowired
StringRedisTemplate stringRedisTemplate;
@Test
public void testStringRedisTemplate(){
// 操作字符串
ValueOperations<String, String> ops = stringRedisTemplate.opsForValue();
// 保存
ops.set("hello", "world_"+ UUID.randomUUID().toString());
// 查询
String hello = ops.get("hello");
System.out.println("获取之前保存的数据:"+hello);
}
测试输出结果:
获取之前保存的数据:world_90bf25e1-2e84-4f50-b6e2-5eaab32b4175
还可以通过安装 redis 可视化工具 RedisDesktopManager 来查看之前保存的数据:
二、缓存失效问题
1、高并发下缓存失效问题-缓存穿透
缓存穿透:
指查询一个
一定不存在
的数据,由于缓存是不命中,将去查询数据库,但是数据库也无此记录,我们没有将这次查询的null写入缓存,这将导致这个不存在的数据每次请求都要到存储层去查询,失去了缓存的意义
风险:
利用不存在的数据进行攻击,数据库瞬时压力增大,最终导致崩溃
解决:
null结果缓存,并加入短暂过期时间
2、高并发下缓存失效问题-缓存雪崩
缓存雪崩:
缓存雪崩是指在我们设置缓存时key采用了相同的过期时间,导致缓存在某一时刻同时失效,请求全部转发到DB,DB瞬时
压力过重雪崩。
解决:
原有的失效时间基础上增加一个随机值,比如1-5分钟随机,这样每一个缓存的过期时间的重复率就会降低,就很难引发集体失效的事件。
3、高并发下缓存失效问题-缓存击穿
缓存击穿:
• 对于一些设置了过期时间的key,如果这些key可能会在某些时间点被超高并发地访问,是一种非常“热点”的数据。
• 如果这个key在大量请求同时进来前正好失效,那么所有对这个key的数据查询都落到db,我们称为缓存击穿。
解决:
加锁,大量并发只让一个去查,其他人等待,查到以后释放锁,其他人获取到锁,先查缓存,就会有数据,不用去db
三、缓存数据一致性
四、分布式锁
1、分布式下如何加锁?
先来看一个本地锁的例子:我们有一个商品服务,每一个服务都部署在一个独立的tomcat中,每一个服务中都使用一个锁。假设目前有8个服务,则需要加8把锁,且这8把锁相互独立。
本地锁,只能锁住当前进程,所以我们需要分布式锁
2、锁-时序问题
在加锁的时候,需要将设置查数据库和设置缓存这一步同时放入加锁的方法中,斗则会出现多次查询数据库的情况,这是由于第一次查询数据库的时候,数据还没有放入缓存,而设置缓存也是需要时间的,在设置缓存的这段时间内,缓存中还没有数据,就有可能由于并发较高,导致多次查询数据库,没有命中缓存,所以就需要就将设置缓存放到加锁查数据库的逻辑里。
3、分布式锁演进-基本原理
由于本地锁只能锁住当前进程,如果我们在进行秒杀活动或者说抢优惠券活动的时候,如果只剩了1件商品或者1张优惠券,如果使用的是本地锁,同时多个服务一块请求获取数据,就有可能产生“超卖”的现象,为了避免这种情况的发生,我们就需要使用分布式锁。
我们可以同时去一个地方“占坑(加锁)”,如果占到,就执行逻辑。否则就必须等待,直到释放锁。
“占坑(加锁)”可以去 redis,也可以去数据库,可以去任何服务都能访问的地。
如果没有获取到锁,则可以可以以自旋的方式进行等待。
1、分布式锁演进-V1,setnx("lock","1")
/**
* 从数据库获取数据,使用redis的分布式锁 V1
* 问题:
* 1、setnx占好了位,业务代码异常或者程序在页面过程
* 中宕机。没有执行删除锁逻辑,这就造成了死锁
* 解决:
* 设置锁的自动过期,即使没有删除,会自动删除
* @see CategoryServiceImpl#getCatalogJsonFromDbWithRedisLockV2()
*
* @return
*/
public Map<String, List<Catelog2Vo>> getCatalogJsonFromDbWithRedisLockV1() {
//1、占分布式锁,去redis占锁,对用redis命令 set lock 1 NX
Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", "1");
if (lock){
//加锁成功,执行业务
Map<String, List<Catelog2Vo>> dataFromDb = getDataFromDb();
//删除锁
stringRedisTemplate.delete("lock");
return dataFromDb;
}else {
//加锁失败...重试
//休眠100ms重试
//自旋的方式
return getCatalogJsonFromDbWithRedisLockV1();
}
}
存在问题:
setnx占好了位,业务代码异常或者程序在执行过程中宕机。没有执行删除锁逻辑,这就造成了
死锁
如何解决:
设置锁的自动过期,即使没有删除,会自动删除
2、分布式锁演进-V2,setnx("lock","1")+设置锁过期时间
/**
* 从数据库获取数据,使用redis的分布式锁 V2
* 问题:
* 1、setnx设置好,正要去设置过期时间,宕机。又死锁了。
* 解决:
* 设置过期时间和占位必须是原子的。redis支持使用 setnx ex 命令 (set lock 1 EX 30 NX 加锁和设置过期时间在一个语句中完成,设置30秒过期)
* @see CategoryServiceImpl#getCatalogJsonFromDbWithRedisLockV3()
* @return
*/
public Map<String, List<Catelog2Vo>> getCatalogJsonFromDbWithRedisLockV2() {
//1、占分布式锁,去redis占锁,对用redis命令 set lock 1 NX
Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", "1");
if (lock){
//加锁成功,执行业务
//2、设置过期时间
stringRedisTemplate.expire("lock",30, TimeUnit.SECONDS);
Map<String, List<Catelog2Vo>> dataFromDb = getDataFromDb();
//删除锁
stringRedisTemplate.delete("lock");
return dataFromDb;
}else {
//加锁失败...重试
//休眠100ms重试
//自旋的方式
return getCatalogJsonFromDbWithRedisLockV1();
}
}
问题:
setnx设置好,正要去设置过期时间,结果突然断电,服务宕机。又死锁了。
解决:
设置过期时间和占位必须是原子的
。redis支持使用setnx ex
命令
3、分布式锁演进-V3,setnx ex 原子操作
/**
* 从数据库获取数据,使用redis的分布式锁 V3
* 问题:
* 1、删除锁直接删除的问题?
* 如果由于业务时间很长,锁自己过期了,我们直接删除,有可能把别人正在持有的锁删除了。
* 解决:
* 占锁的时候,值指定为uuid,每个人匹配是自己的锁才删除。
* @see CategoryServiceImpl#getCatalogJsonFromDbWithRedisLockV4()
* @return
*/
public Map<String, List<Catelog2Vo>> getCatalogJsonFromDbWithRedisLockV3() {
//1、占分布式锁,去redis占锁,对用redis命令
//2、设置过期时间,必须和加锁是同步的,原子的 set lock 1 EX 30 NX
Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", "1", 30, TimeUnit.SECONDS);
if (lock){
//加锁成功,执行业务
Map<String, List<Catelog2Vo>> dataFromDb = getDataFromDb();
//删除锁
stringRedisTemplate.delete("lock");
return dataFromDb;
}else {
//加锁失败...重试
//休眠100ms重试
//自旋的方式
return getCatalogJsonFromDbWithRedisLockV1();
}
}
问题:
删除锁直接删除的问题?
如果由于业务时间很长,锁自己过期了,我们直接删除,有可能把别人正在持有的锁删除了。
解决:
占锁的时候,值指定为
uuid
,每个人匹配是自己的锁才删除。
4、分布式锁演进-V4,setnx ex 原子操作+唯一锁值
/**
* 从数据库获取数据,使用redis的分布式锁 V4
* 问题:
* 1、如果正好判断是当前值,正要删除锁的时候,锁已经过期,别人已经设置到了新的值。那么我们删除的是别人的锁
* 解决:
* 删除锁必须保证原子性。使用redis+Lua脚本完成
* @see CategoryServiceImpl#getCatalogJsonFromDbWithRedisLockV5()
* @return
*/
public Map<String, List<Catelog2Vo>> getCatalogJsonFromDbWithRedisLockV4() {
//1、占分布式锁,去redis占锁,对用redis命令
//2、设置过期时间,必须和加锁是同步的,原子的 set lock 1 EX 30 NX
//使用uuid作为锁的值
String uuid = UUID.randomUUID().toString();
Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", uuid, 30, TimeUnit.SECONDS);
if (lock){
//加锁成功,执行业务
Map<String, List<Catelog2Vo>> dataFromDb = getDataFromDb();
//删除锁前先进行获取,判断是不是自己的锁编号uuid,是的话再删除
String lockValue = stringRedisTemplate.opsForValue().get("lock");
if (uuid.equals(lockValue)){
//删除自己的锁
stringRedisTemplate.delete("lock");
}
return dataFromDb;
}else {
//加锁失败...重试
//休眠100ms重试
//自旋的方式
return getCatalogJsonFromDbWithRedisLockV1();
}
}
问题:
如果正好判断是当前值,正要删除锁的时候,锁已经过期,别人已经设置到了新的值。那么我们删除的是别人的锁。
也就是说,业务执行时间大于锁的过期时间,这个时候,删除的锁就不是之前业务的锁,而是后来业务的锁。
解决:
删除锁必须保证原子性
。使用redis+Lua脚本
完成
5、分布式锁演进-V5,setnx ex 原子操作+唯一锁值+Lua脚本删除锁保证原子性
/**
* 从数据库获取数据,使用redis的分布式锁 V5
* 保证加锁【占位+过期时间】和删除锁【判断+删除】的原子性。使用redis+Lua脚本完成
* 更难的事情,是锁的自动续期
* @return
*/
public Map<String, List<Catelog2Vo>> getCatalogJsonFromDbWithRedisLockV5() {
//1、占分布式锁,去redis占锁,对用redis命令
//2、设置过期时间,必须和加锁是同步的,原子的 set lock 1 EX 30 NX
//使用uuid作为锁的值
String uuid = UUID.randomUUID().toString();
Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", uuid, 30, TimeUnit.SECONDS);
if (lock){
log.info("获取分布式锁成功....");
Map<String, List<Catelog2Vo>> dataFromDb;
try {
//加锁成功,执行业务
dataFromDb = getDataFromDb();
} finally {
//删除锁前先进行获取,判断是不是自己的锁编号uuid,是的话再删除
//获取对比值+对比成删除==原子操作 使用lua脚本解锁
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
//删除锁,删除成功返回 1,删除失败返回 0
Long lock1 = stringRedisTemplate.execute(new DefaultRedisScript<Long>(script, Long.class),
Arrays.asList("lock"), uuid);
}
return dataFromDb;
}else {
log.info("获取分布式锁失败,等待重试....");
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
//加锁失败...重试
//休眠100ms重试
//自旋的方式
return getCatalogJsonFromDbWithRedisLockV1();
}
}
存在问题:
1、更难的事情,是锁的自动续期
2、使用前面 v1 - v5 的操作太麻烦,加锁解锁都需要自己完成,如果有很多锁则需要写很多重复的代码
如何解决:
使用封装好的 reids 分布式锁工具类,下一节来介绍
4、分布式锁-Redisson简介&整合
先来援引一段文档:
命令
SET resource-name anystring NX EX max-lock-time
是一种用 Redis 来实现锁机制的简单方法。如果上述命令返回
OK
,那么客户端就可以获得锁(如果上述命令返回Nil,那么客户端可以在一段时间之后重新尝试),并且可以通过DEL命令来释放锁。客户端加锁之后,如果没有主动释放,会在过期时间之后自动释放。
可以通过如下优化使得上面的锁系统变得更加鲁棒:
- 不要设置固定的字符串,而是设置为随机的大字符串,可以称为token。
- 通过脚步删除指定锁的key,而不是DEL命令。
上述优化方法会避免下述场景:a客户端获得的锁(键key)已经由于过期时间到了被redis服务器删除,但是这个时候a客户端还去执行DEL命令。而b客户端已经在a设置的过期时间之后重新获取了这个同样key的锁,那么a执行DEL就会释放了b客户端加好的锁。
解锁脚本的一个例子将类似于以下:
if redis.call("get",KEYS[1]) == ARGV[1] then return redis.call("del",KEYS[1]) else return 0 end
注意: 上这种设计模式并不推荐用来实现redis分布式锁。应该参考the Redlock algorithm的实现,因为这个方法只是复杂一点,但是却能保证更好的使用效果。
1、回顾:单Redis实例实现分布式锁的正确方法
在尝试克服上述单实例设置的限制之前,让我们先讨论一下在这种简单情况下实现分布式锁的正确做法,实际上这是一种可行的方案,尽管存在竞态,结果仍然是可接受的,另外,这里讨论的单实例加锁方法也是分布式加锁算法的基础。
获取锁使用命令:
SET resource_name my_random_value NX PX 30000
这个命令仅在不存在key的时候才能被执行成功(NX选项),并且这个key有一个30秒的自动失效时间(PX属性)。这个key的值是“my_random_value”(一个随机值),这个值在所有的客户端必须是唯一的,所有同一key的获取者(竞争者)这个值都不能一样。
value的值必须是随机数主要是为了更安全的释放锁,释放锁的时候使用脚本告诉Redis:只有key存在并且存储的值和我指定的值一样才能告诉我删除成功。可以通过以下Lua脚本实现:
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end
使用这种方式释放锁可以避免删除别的客户端获取成功的锁。举个例子:客户端A取得资源锁,但是紧接着被一个其他操作阻塞了,当客户端A运行完毕其他操作后要释放锁时,原来的锁早已超时并且被Redis自动释放,并且在这期间资源锁又被客户端B再次获取到。如果仅使用DEL命令将key删除,那么这种情况就会把客户端B的锁给删除掉。使用Lua脚本就不会存在这种情况,因为脚本仅会删除value等于客户端A的value的key(value相当于客户端的一个签名)。
这个随机字符串应该怎么设置?我认为它应该是从/dev/urandom产生的一个20字节随机数,但是我想你可以找到比这种方法代价更小的方法,只要这个数在你的任务中是唯一的就行。例如一种安全可行的方法是使用/dev/urandom作为RC4的种子和源产生一个伪随机流;一种更简单的方法是把以毫秒为单位的unix时间和客户端ID拼接起来,理论上不是完全安全,但是在多数情况下可以满足需求.
key的失效时间,被称作“锁定有效期”。它不仅是key自动失效时间,而且还是一个客户端持有锁多长时间后可以被另外一个客户端重新获得。
截至到目前,我们已经有较好的方法获取锁和释放锁。基于Redis单实例,假设这个单实例总是可用,这种方法已经足够安全。现在让我们扩展一下,假设Redis没有总是可用的保障。
2、Redlock算法
在Redis的分布式环境中,我们假设有N个Redis master。这些节点完全互相独立,不存在主从复制或者其他集群协调机制。之前我们已经描述了在Redis单实例下怎么安全地获取和释放锁。我们确保将在每(N)个实例上使用此方法获取和释放锁。在这个样例中,我们假设有5个Redis master节点,这是一个比较合理的设置,所以我们需要在5台机器上面或者5台虚拟机上面运行这些实例,这样保证他们不会同时都宕掉。
为了取到锁,客户端应该执行以下操作:
- 获取当前Unix时间,以毫秒为单位。
- 依次尝试从N个实例,使用相同的key和随机值获取锁。在步骤2,当向Redis设置锁时,客户端应该设置一个网络连接和响应超时时间,这个超时时间应该小于锁的失效时间。例如你的锁自动失效时间为10秒,则超时时间应该在5-50毫秒之间。这样可以避免服务器端Redis已经挂掉的情况下,客户端还在死死地等待响应结果。如果服务器端没有在规定时间内响应,客户端应该尽快尝试另外一个Redis实例。
- 客户端使用当前时间减去开始获取锁时间(步骤1记录的时间)就得到获取锁使用的时间。当且仅当从大多数(这里是3个节点)的Redis节点都取到锁,并且使用的时间小于锁失效时间时,锁才算获取成功。
- 如果取到了锁,key的真正有效时间等于有效时间减去获取锁所使用的时间(步骤3计算的结果)。
- 如果因为某些原因,获取锁失败(没有在至少N/2+1个Redis实例取到锁或者取锁时间已经超过了有效时间),客户端应该在所有的Redis实例上进行解锁(即便某些Redis实例根本就没有加锁成功)。
1、这个算法是异步的么?
算法基于这样一个假设:虽然多个进程之间没有时钟同步,但每个进程都以相同的时钟频率前进,时间差相对于失效时间来说几乎可以忽略不计。这种假设和我们的真实世界非常接近:每个计算机都有一个本地时钟,我们可以容忍多个计算机之间有较小的时钟漂移。
从这点来说,我们必须再次强调我们的互相排斥规则:只有在锁的有效时间(在步骤3计算的结果)范围内客户端能够做完它的工作,锁的安全性才能得到保证(锁的实际有效时间通常要比设置的短,因为计算机之间有时钟漂移的现象)。.
想要了解更多关于需要时钟漂移间隙的相似系统, 这里有一个非常有趣的参考: Leases: an efficient fault-tolerant mechanism for distributed file cache consistency.
2、失败时重试
当客户端无法取到锁时,应该在一个随机延迟后重试,防止多个客户端在同时抢夺同一资源的锁(这样会导致脑裂,没有人会取到锁)。同样,客户端取得大部分Redis实例锁所花费的时间越短,脑裂出现的概率就会越低(必要的重试),所以,理想情况一下,客户端应该同时(并发地)向所有Redis发送SET命令。
需要强调,当客户端从大多数Redis实例获取锁失败时,应该尽快地释放(部分)已经成功取到的锁,这样其他的客户端就不必非得等到锁过完“有效时间”才能取到(然而,如果已经存在网络分裂,客户端已经无法和Redis实例通信,此时就只能等待key的自动释放了,等于被惩罚了)。
3、释放锁
释放锁比较简单,向所有的Redis实例发送释放锁命令即可,不用关心之前有没有从Redis实例成功获取到锁.
3、Redisson简介&整合
1. 概述
Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。其中包括(BitSet
, Set
, Multimap
, SortedSet
, Map
, List
, Queue
, BlockingQueue
, Deque
, BlockingDeque
, Semaphore
, Lock
, AtomicLong
, CountDownLatch
, Publish / Subscribe
, Bloom filter
, Remote service
, Spring cache
, Executor service
, Live Object service
, Scheduler service
) Redisson提供了使用Redis的最简单和最便捷的方法。Redisson的宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上。
关于Redisson项目的详细介绍可以在官方网站找到。
每个Redis服务实例都能管理多达1TB的内存。
能够完美的在云计算环境里使用,并且支持AWS ElastiCache主备版,AWS ElastiCache集群版,Azure Redis Cache和阿里云(Aliyun)的云数据库Redis版
以下是Redisson的结构:
- Redisson作为独立节点 可以用于独立执行其他节点发布到分布式执行服务 和 分布式调度任务服务 里的远程任务。
如果你现在正在使用其他的Redis的Java客户端,那么Redis命令和Redisson对象匹配列表 能够帮助你轻松的将现有代码迁徙到Redisson框架里来。
Redisson底层采用的是Netty 框架。支持Redis 2.8以上版本,支持Java1.6+以上版本。
2、整合
1、方式一:使用 Redisson
1、在 pom 文件中引入依赖:
<!-- 引入 redisson 依赖 -->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.12.5</version>
</dependency>
2、配置 redisson
使用配置类的方式配置
@Configuration
public class MyRedissonConfig {
/**
* 对所有的 Redisson 的使用都是通过 RedissonClient 对象
* @return
* @throws IOException
*/
@Bean(destroyMethod = "shutdown")
RedissonClient redisson() throws IOException {
// // 默认连接地址 127.0.0.1:6379
// RedissonClient redisson = Redisson.create();
// 1、创建配置
Config config = new Config();
// Redis url should start with redis:// or rediss:// (for SSL connection)
config.useSingleServer().setAddress("redis://192.168.56.10:6379");
// 2、根据 Config 创建出 RedissonClient 实例
RedissonClient redisson = Redisson.create(config);
return redisson;
}
}
3、测试
@Autowired
RedissonClient redissonClient;
@Test
public void testRedissonClient(){
System.out.println(redissonClient);
}
参考文档:
2、方式二:使用Redisson/Spring Boot Starter
1、在项目中添加依赖项:
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.13.1</version>
</dependency>
2、添加配置到application.settings
配置文件
# common spring boot settings
spring.redis.database=redis
spring.redis.host=192.168.56.10
spring.redis.port=6379
spring.redis.password=
spring.redis.ssl=
spring.redis.timeout=
spring.redis.cluster.nodes=
spring.redis.sentinel.master=
spring.redis.sentinel.nodes=
# Redisson settings
#path to config - redisson.yaml
spring.redis.redisson.config=classpath:redisson.yaml
3、通过带有RedissonClient
接口或RedisTemplate
/ ReactiveRedisTemplate
对象的spring bean使用Redisson
@Autowired
RedissonClient redisson;
@ResponseBody
@GetMapping("/hello")
public String hello() {
//1、获取一把锁,只要锁的名字一样,就是同一把锁
RLock lock = redisson.getLock("my-lock");
//2、加锁
//lock.lock(); //阻塞式等待。默认加的锁都是30s时间
// 加锁以后10秒钟自动解锁
// 无需调用unlock方法手动解锁;在锁时间到了以后,不会自动续期,自动解锁时间一定要大于业务执行时间
lock.lock(10, TimeUnit.SECONDS);
try {
System.out.println("加锁成功,执行业务..." + Thread.currentThread().getId());
Thread.sleep(30000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("释放锁..." + Thread.currentThread().getId());
lock.unlock();
}
return "hello";
}
参考文档:
4、分布式锁-Redisson-lock锁测试
基于Redis的Redisson分布式可重入锁RLock
Java对象实现了java.util.concurrent.locks.Lock
接口。同时还提供了异步(Async)、反射式(Reactive)和RxJava2标准的接口。
RLock lock = redisson.getLock("anyLock");
// 最常见的使用方法
lock.lock();
大家都知道,如果负责储存这个分布式锁的Redisson节点宕机以后,而且这个锁正好处于锁住的状态时,这个锁会出现锁死的状态。为了避免这种情况的发生,Redisson内部提供了一个监控锁的看门狗,它的作用是在Redisson实例被关闭前,不断的延长锁的有效期
。默认情况下,看门狗的检查锁的超时时间是30秒钟
,也可以通过修改Config.lockWatchdogTimeout来另行指定。
接下来我自己来测试一下,首先实现一个简单的测试接口:
@ResponseBody
@GetMapping("/hello")
public String hello() {
//1、获取一把锁,只要锁的名字一样,就是同一把锁
RLock lock = redisson.getLock("my-lock");
//2、加锁
lock.lock(); //阻塞式等待。默认加的锁都是30s时间
try {
//redisson解决了两个问题:
//1)、锁的自动续期,如果业务执行时间超长,运行期间自动给锁续上新的30s,不用担心业务时间长,锁自动过期被删掉
//2)、加锁的业务只要运行完成,就不会给当前锁续期,即使不手动解锁,锁默认在30s后自动删除
System.out.println("加锁成功,执行业务..." + Thread.currentThread().getId());
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//3、解锁 假设当前服务执行时宕机,解锁代码没有运行,redisson会不会出现死锁?
System.out.println("释放锁..." + Thread.currentThread().getId());
lock.unlock();
}
return "hello";
}
同时启动两个不同端口的相同服务,记作服务A、B。在请求A、B之后,手动关闭服务A,模拟遭遇宕机解锁代码没有执行的情况,看最后是否解锁,服务B是否可以获得锁:
从上面的执行结果中,可以看到,服务宕机,redisson依然解锁成功。
redisson解决了两个问题:
1)、锁的自动续期,如果业务执行时间超长,运行期间自动给锁续上新的30s,不用担心业务时间长,锁自动过期被删掉
2)、加锁的业务只要运行完成,就不会给当前锁续期,即使不手动解锁,锁默认在30s后自动删除
这些都是基于看门狗实现的,写一节来了解一下看门狗的实现原理。
参考文档:
5、分布式锁-Redisson-lock看门狗原理-redisson如何解决死锁
Redisson还通过加锁的方法提供了leaseTime
的参数来指定加锁的时间。超过这个时间后锁便自动解开了。
// 加锁以后10秒钟自动解锁
// 无需调用unlock方法手动解锁
lock.lock(10, TimeUnit.SECONDS);
下面我们来测试一下,我们设置10s自动解锁,设置业务执行时间是30s:
@ResponseBody
@GetMapping("/hello")
public String hello() {
//1、获取一把锁,只要锁的名字一样,就是同一把锁
RLock lock = redisson.getLock("my-lock");
//2、加锁
// 加锁以后10秒钟自动解锁
lock.lock(10, TimeUnit.SECONDS);
try {
System.out.println("加锁成功,执行业务..." + Thread.currentThread().getId());
Thread.sleep(30000); //业务执行时间30s
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("释放锁..." + Thread.currentThread().getId());
lock.unlock();
}
return "hello";
}
执行后的效果:
RLock
对象完全符合Java的Lock规范。也就是说只有拥有锁的进程才能解锁
,其他进程解锁则会抛出IllegalMonitorStateException
错误
问题:lock.lock(10, TimeUnit.SECONDS);在锁时间到了以后,不会自动续期
1、如果我们传递了锁的超时时间,就发送给redis执行脚本,进行占锁,默认超时就是我们指定的时间
2、如果我们未指定锁的超时时间,就使用 lockWatchdogTimeout = 30000L;【lockWatchdogTimeout看门狗的默认时间】。只要占锁成功,就会启动一个定时任务【重新给锁设定过期时间,新的过期时间就是看门狗的默认时间】,每隔10s都会自动续期,续成30s,续期时间的间隔是【internalLockLeaseTime(看门狗时间) / 3L】 10s 续期一次
下面来看一下源码:
先看不设置过期时间的加锁方法:lock()
public void lock() {
try {
//leaseTime:-1,在后边的判断会用到;TimeUnit:null;是否可中断:false
this.lock(-1L, (TimeUnit)null, false);
} catch (InterruptedException var2) {
throw new IllegalStateException();
}
}
//看一下再点击来看一下 lock(long leaseTime, TimeUnit unit, boolean interruptibly) 方法的实现
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
//获取当前线程的id
long threadId = Thread.currentThread().getId();
// 尝试获取锁,这个方法是重点,下面进入这个方法中
Long ttl = this.tryAcquire(leaseTime, unit, threadId);
... //略
}
// 查看 tryAcquire 方法
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
// 进入 尝试获取异步 tryAcquireAsync 这个方法
return (Long)this.get(this.tryAcquireAsync(leaseTime, unit, threadId));
}
//查看 尝试获取异步 tryAcquireAsync 方法
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
//如果leaseTime不是-1,则进入这个逻辑,根据前面的代码知道lock()默认leaseTime=-1,所以lock()方法不进这个逻辑,所以设置自动过期时间的方法 lock.lock(10, TimeUnit.SECONDS) 是会进入这个逻辑的
if (leaseTime != -1L) {
return this.tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else {
//获取一个 RFuture,和java中的Future是类似的, 设置锁的默认过期时间this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout() 这个是设置默认锁过期时间,也就是下面Config类中的lockWatchdogTimeout
RFuture<Long> ttlRemainingFuture = this.tryLockInnerAsync(this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
//占锁成功,进行监听
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
//没有抛出异常说明,占锁成功
if (e == null) {
if (ttlRemaining == null) {
//启动一个定时任务【重新给锁设定过期时间,新的过期时间就是看门狗的默认时间】,每隔10s都会自动续期,续成30s,下面来看这个方法
this.scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
}
// 对应上面 this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout() 中的时间
public Config() {
...
this.lockWatchdogTimeout = 30000L;
...
}
// 时间表到期续订方法
private void scheduleExpirationRenewal(long threadId) {
RedissonLock.ExpirationEntry entry = new RedissonLock.ExpirationEntry();
RedissonLock.ExpirationEntry oldEntry = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.putIfAbsent(this.getEntryName(), entry);
if (oldEntry != null) {
oldEntry.addThreadId(threadId);
} else {
entry.addThreadId(threadId);
//进入续期方法
this.renewExpiration();
}
}
//续期方法
private void renewExpiration() {
RedissonLock.ExpirationEntry ee = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());
if (ee != null) {
Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
public void run(Timeout timeout) throws Exception {
RedissonLock.ExpirationEntry ent = (RedissonLock.ExpirationEntry)RedissonLock.EXPIRATION_RENEWAL_MAP.get(RedissonLock.this.getEntryName());
if (ent != null) {
Long threadId = ent.getFirstThreadId();
if (threadId != null) {
RFuture<Boolean> future = RedissonLock.this.renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
if (e != null) {
RedissonLock.log.error("Can't update lock " + RedissonLock.this.getName() + " expiration", e);
} else {
if (res) {
RedissonLock.this.renewExpiration();
}
}
});
}
}
}
// this.internalLockLeaseTime / 3L 续期时间,在RedissonLock(CommandAsyncExecutor commandExecutor, String name)方法中可以看到internalLockLeaseTime就是 lockWatchdogTimeout看门狗的默认时间30s,所以是每隔10s续期一次,续成30s
}, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
}
public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
...
this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
...
}
再来设置过期时间的加锁方法:lock.lock(10, TimeUnit.SECONDS)
public void lock(long leaseTime, TimeUnit unit) {
try {
this.lock(leaseTime, unit, false);
} catch (InterruptedException var5) {
throw new IllegalStateException();
}
}
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
long threadId = Thread.currentThread().getId();
//同样是进入tryAcquire尝试获取锁这个方法,和lock()方法一样
Long ttl = this.tryAcquire(leaseTime, unit, threadId);
...
}
//尝试获取锁
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
// 进入 尝试获取异步 tryAcquireAsync 这个方法,和lock()方法一样
return (Long)this.get(this.tryAcquireAsync(leaseTime, unit, threadId));
}
// 尝试获取异步
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
if (leaseTime != -1L) {
// lock.lock(10, TimeUnit.SECONDS),进入这个逻辑
return this.tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else {
RFuture<Long> ttlRemainingFuture = this.tryLockInnerAsync(this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e == null) {
if (ttlRemaining == null) {
this.scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
}
// 尝试获取异步,得到lua脚本
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
this.internalLockLeaseTime = unit.toMillis(leaseTime);
return this.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);", Collections.singletonList(this.getName()), this.internalLockLeaseTime, this.getLockName(threadId));
}
参考文档:
6、分布式锁-Redisson-读写锁测试
读写锁测试,加读写锁可以保证一定能读到最新数据修改期间,写锁是一个排它锁(互斥锁),读锁是一个共享锁,写锁没释放,读写就必须等待。
- 读+读:相当于无锁,并发读,只会在redis中记录好,所有当前的读锁。它们都会同时加锁成功
- 写+读:等待写锁释放
- 写+写:阻塞方式
- 读+写:有读锁,写也需要等待
- 只要有写的存在,都必须等待
实现一个write和read接口,分别用来测试写锁和读锁:
@GetMapping("/write")
@ResponseBody
public String writeValue(){
RReadWriteLock lock = redisson.getReadWriteLock("rw-lock");
String s = "";
RLock rLock = lock.writeLock();
try {
//1、修改数据加写锁,读数据加读锁
rLock.lock();
s = UUID.randomUUID().toString();
stringRedisTemplate.opsForValue().set("writeValue", s);
Thread.sleep(10000);
} catch (Exception e) {
e.printStackTrace();
} finally {
rLock.unlock();
}
return s;
}
@GetMapping("/read")
@ResponseBody
public String readValue(){
RReadWriteLock lock = redisson.getReadWriteLock("rw-lock");
String s = "";
//获取读锁
RLock rLock = lock.readLock();
try {
rLock.lock();
s = stringRedisTemplate.opsForValue().get("writeValue");
} catch (Exception e) {
e.printStackTrace();
} finally {
rLock.unlock();
}
return s;
}
测试效果:
参考文档:
7、分布式锁-Redisson-闭锁测试
redisson 的闭锁和 java 中的 java.util.concurrent.CountDownLatch
是类似的。
测试闭锁:
- 1、模拟一个放假锁门的场景
- 2、学校一共5个班,只有等5个班都没人了才可以锁学校大门
/**
* 测试闭锁:锁门方法
*/
@GetMapping("/lockdoor")
@ResponseBody
public String lockDoor() throws InterruptedException {
RCountDownLatch door = redisson.getCountDownLatch("door");
door.trySetCount(5);
door.await();//等待闭锁全部完成
return "放假了";
}
/**
* 模拟班级学生全都离开班级的方法
*/
@GetMapping("/go/{id}")
@ResponseBody
public String go(@PathVariable("id") Long id) {
RCountDownLatch door = redisson.getCountDownLatch("door");
door.countDown();//每离开一个班就计数减一
return id + "班的人都走了...";
}
测试效果:
参考文档:
8、分布式锁-Redisson-信号量测试
测试信号量:类似于 java 中的java.util.concurrent.Semaphore
模拟车库停车:3个车位,同时只能有3辆车停,只有有车位了才能停车
/**
* 车库停车
* @return
* @throws InterruptedException
*/
@GetMapping("/park")
@ResponseBody
public String park() throws InterruptedException {
RSemaphore park = redisson.getSemaphore("park");
park.acquire();//阻塞式的
return "ok";
}
/**
* 车位上的车离开
*/
@GetMapping("/leave")
@ResponseBody
public String leave(){
RSemaphore park = redisson.getSemaphore("park");
park.release();//释放一个车位,释放一个信号量
return "ok";
}
应用:
比如信号量也可以用作分布式限流
的场景,比如同时在线人数只允许100000人等。
参考文档:
五、Spring Cache
1、简介
Spring 从3.1开始定义了org.springframework.cache.Cache
和org.springframework.cache.CacheManager
接口来统一不同的缓存技术;并支持使用 JCache (ISR-107)
注解简化我们开发;
Cache 接口为缓存的组件规范定义,包含缓存的各种操作集合;Cache 接口下 Spring 提供了各种 xxCache
的实现;如 RedisCache
, EhCacheCache
,ConcurrentMapCache
等;
每次调用需要缓存功能的方法时, Spring会检查检查指定参数的指定的目标方法是否已经被调用过;如果有就直接从缓存中获取方法调用后的结果,如果没有就调用方法并缓存结果后返回给用户。下次调用直接从缓存中获取。
使用 Spring 缓存抽象时我们需要关注以下两点:
1、确定方法需要被缓存以及他们的缓存策略
2、从缓存中读取之前缓存存储的数据
2、基础概念
CacheManager 管理众多 Cache。缓存管理器是定义规则的,真正实际上处理缓存的是不同的缓存组件。
代码结构图:
代码模块图:
3、SpringCache-整合
1、整合
1、引入依赖
spring-boot-starter-cache、spring-boot-starter-data-redis(使用redis作为缓存就要引入redis的依赖)
<!-- 引入 spring-boot-starter-cache 依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-cache</artifactId>
</dependency>
<!-- 引入redis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<exclusions>
<exclusion>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
</exclusion>
</exclusions>
</dependency>
2、写配置
(1)、自动配置了那些?
org.springframework.boot.autoconfigure.cache.CacheAutoConfiguration
缓存自动配置类
org.springframework.boot.autoconfigure.cache.CacheProperties
所有在xml文件中配置的属性都封装在这里
org.springframework.boot.autoconfigure.cache.CacheConfigurations
获取每一种类型的缓存
org.springframework.boot.autoconfigure.cache.CacheConfigurations
#getConfigurationClass
得到对应缓存的映射
org.springframework.boot.autoconfigure.cache.CacheType
一个枚举类,封装了各种类型的缓存
org.springframework.boot.autoconfigure.cache.RedisCacheConfiguration
使用redis作为缓存时的各种配置
`CacheAutoConfiguration 会导入 RedisCacheConfiguration`
`RedisCacheConfiguration 会自动装配好了redis缓存管理器 RedisCacheManager`
(2)、我们自己需要配置的内容?
配置使用redis作为缓存。在application.properties
、或application.yml
或bootstrap.properties
或配置中心
中配置 spring.cache.type=redis
3、测试使用缓存
@Cacheable
: Triggers cache population. 触发缓存保存
@CacheEvic
t: Triggers cache eviction. 触发删除缓存
@CachePut
: Updates the cache without interfering with the method execution. 更新缓存,而不影响方法的执行
@Caching
: Regroups multiple cache operations to be applied on a method. 重新组合要在一个方法上应用的多个缓存操作
@CacheConfig
: Shares some common cache-related settings at class-level. 在类级别共享一些与缓存相关的常见设置
(1)、开启缓存功能:
在启动类 XxxApplication
上使用 @EnableCaching
注解,开启缓存功能
(2)、在方法上使用 @Cacheable
注解
只需要在需要缓存数据的方法上使用注解就能完成缓存操作
// @Cacheable(value = {"category"},key = "'level1Categorys'")
@Cacheable(value = {"category"},key = "#root.method.name") // #root.methodName spel表达式,使用调用的方法名作为缓存的key
@Override
public List<CategoryEntity> getLevel1Categorys() {
long l = System.currentTimeMillis();
List<CategoryEntity> categoryEntities = baseMapper.selectList(new QueryWrapper<CategoryEntity>().eq("cat_level", 1));
System.out.println("DB耗时:"+(System.currentTimeMillis()-l));
return categoryEntities;
}
2、注解:
对于缓存声明,Spring的缓存抽象提供了一组Java注释:
-
@Cacheable
:触发缓存保存。 -
@CacheEvict
:触发删除缓存。 -
@CachePut
:更新缓存,而不影响方法的执行。 -
@Caching
:重新组合要在一个方法上应用的多个缓存操作。 -
@CacheConfig
:在类级别共享一些与缓存相关的常见设置。
在业务中每一个需要缓存的数据都要指定放到对应的那个名字的缓存中。相当于缓存的分区,一般建议按照业务类型来划分。
(1)、@Cacheable
代表当前方法的结果需要缓存,如果缓存中有,方法不用调用。如果缓存中没有,会调用方法,最后将方法的结果放入缓存。
@Cacheable
的默认行为:
- 如果缓存中有,方法不用调用
- key默认自动生成,缓存名字::SimpleKey [](自动生成的key)
- 缓存的value值,默认使用的是jdk的序列化机制,将序列化后的值存在redis中
- 默认时间ttl=-1
如果我们需要自定义属性,该怎么做呢?
- 指定生成的缓存使用的key:key属性指定,使用spel表达式
SPEL表达式:https://docs.spring.io/spring/docs/5.2.7.RELEASE/spring-framework-reference/integration.html#cache-spel-context- 指定缓存的数据的存活时间:配置文件中修改ttl,spring.cache.redis.time-to-live=3600000
- 将数据保存为json格式
// @Cacheable(value = {"category"},key = "'level1Categorys'")
@Cacheable(value = {"category"},key = "#root.method.name") // #root.methodName spel表达式,使用调用的方法名作为缓存的key
@Override
public List<CategoryEntity> getLevel1Categorys() {
long l = System.currentTimeMillis();
List<CategoryEntity> categoryEntities = baseMapper.selectList(new QueryWrapper<CategoryEntity>().eq("cat_level", 1));
System.out.println("DB耗时:"+(System.currentTimeMillis()-l));
return categoryEntities;
}
(2)、@CacheEvict
@CacheEvict
注解是支持缓存一致性——失效模式
的注解。@CachePut
是支持缓存一致性——双写模式
的注解。
要让一个缓存在更新数据的时候失效,就需要使用@CacheEvict
注解:
清空单个缓存: @CacheEvict(value = {"category"},key = "'getLevel1Categorys'")
/**
* 级联更所有关联数据
* @CacheEvict:缓存一致性——失效模式
* @CachePut:缓存一致性——双写模式
* @param category
*/
@CacheEvict(value = {"category"},key = "'getLevel1Categorys'") //清空单个缓存
@Transactional // 开启事务
@Override
public void updateCascade(CategoryEntity category) {
// 1、先更新当前表的内容
this.updateById(category);
//2、更新级联表的冗余内容
categoryBrandRelationService.updateCategory(category.getCatId(),category.getName());
}
清空多个缓存:
(1)、同时操作多个缓存:@Caching
@Caching(evict = { //清空多个缓存
@CacheEvict(value = {"category"},key = "'getLevel1Categorys'"),
@CacheEvict(value = {"category"},key = "'getCatalogJson'")
})
@Caching(evict = { //清空多个缓存
@CacheEvict(value = {"category"},key = "'getLevel1Categorys'"),
@CacheEvict(value = {"category"},key = "'getCatalogJson'")
})
@Transactional // 开启事务
@Override
public void updateCascade(CategoryEntity category) {
// 1、先更新当前表的内容
this.updateById(category);
//2、更新级联表的冗余内容
categoryBrandRelationService.updateCategory(category.getCatId(),category.getName());
}
(2) 指定删除某个分区下的所有数据:@CacheEvict(value = {"category"},allEntries = true)
@CacheEvict(value = {"category"},allEntries = true)
//清空整个分区的缓存
@CacheEvict(value = {"category"},allEntries = true) //清空整个分区的缓存
@Transactional // 开启事务
@Override
public void updateCascade(CategoryEntity category) {
// 1、先更新当前表的内容
this.updateById(category);
//2、更新级联表的冗余内容
categoryBrandRelationService.updateCategory(category.getCatId(),category.getName());
}
所以以后使用缓存可以定义如下规则:
- 1、存储
同一类型
的数据,都可以指定成同一个分区
。分区名默认就是缓存的前缀
,所以不需要需要设置缓存前缀spring.cache.redis.key-prefix=CACHE_
,这样缓存中的键值就是分区名::方法名
- 2、指定删除某个分区下的所有数据:
@CacheEvict(value = {"category"}, allEntries = true)
测试效果:
4、自定义缓存配置
自定义缓存配置,需要定义一个缓存配置类:
@EnableConfigurationProperties(CacheProperties.class) // 让 CacheProperties 的绑定生效
@Configuration
@EnableCaching // 开启缓存(配置在 XxxApplication 主类上也可以)
public class MyCacheConfig {
// @Autowired
// CacheProperties cacheProperties;
/**
* 使用缓存配置类后,缓存配置文件中设置的属性将会失效,比如:
* spring.cache.redis.time-to-live=3600000 # 设置缓存存活时间,单位是ms
* 所以需要另外在这里配置重新绑定
*
* 1、原来和配置文件绑定的配置类是这样子的
* @ConfigurationProperties(
* prefix = "spring.cache"
* )
* public class CacheProperties {
*
* 2、要让它生效:
* 1)、@EnableConfigurationProperties(CacheProperties.class) 让 CacheProperties 的绑定生效
* 2)、注入 CacheProperties 或者在配置方法中加上 CacheProperties 参数
*
* @return
*/
@Bean
RedisCacheConfiguration redisCacheConfiguration(CacheProperties cacheProperties) {
RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig();
//设置key的序列化
config = config.serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(new StringRedisSerializer()));
//设置value的序列化,使用fastjson
config = config.serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(new GenericFastJsonRedisSerializer()));
CacheProperties.Redis redisProperties = cacheProperties.getRedis();
// 将配置文件中的所有配置都生效
if (redisProperties.getTimeToLive() != null) {
config = config.entryTtl(redisProperties.getTimeToLive());
}
if (redisProperties.getKeyPrefix() != null) {
config = config.prefixKeysWith(redisProperties.getKeyPrefix());
}
if (!redisProperties.isCacheNullValues()) {
config = config.disableCachingNullValues();
}
if (!redisProperties.isUseKeyPrefix()) {
config = config.disableKeyPrefix();
}
return config;
}
}
其他的缓存属性配置:
# 设置缓存存活时间,单位是ms
spring.cache.redis.time-to-live=3600000
# 设置缓存的前缀,如果指定了前缀就使用我们指定的前缀,否则就默认使用缓存的名字作为前缀
spring.cache.redis.key-prefix=CACHE_
# 设置是否启用缓存前缀,默认是true
spring.cache.redis.use-key-prefix=true
# 是否缓存空值,设置为true可以防止缓存穿透
spring.cache.redis.cache-null-values=true
自定义测试效果:
5、缓存穿透问题解决
在配置文件中配置允许缓存空值,解决缓存穿透问题
# 是否缓存空值,设置为true可以防止缓存穿透
spring.cache.redis.cache-null-values=true
6、Spring-Cache的不足
原理:
CacheManager
(RedisCacheManager
) --创建-->Cache
(RedisCache
)-->Cache
负责缓存的读写操作
不足:
(1)、读模式
-
缓存穿透
:查询一个null数据。解决:缓存空数据
,添加配置spring.cache.redis.cache-null-values=true
-
缓存击穿
:大量并发进来同时查询一个正好过期的数据。解决:加锁
。但是Spring-Cache默认put时是不加锁的,所以没有办法解决这个问题。但是可以设置sync = true
@Cacheable(value = xxx, key = xxx, sync = true)
,在查缓存的时候调用使用了同步的get方法org.springframework.data.redis.cache.RedisCache#get(java.lang.Object, java.util.concurrent.Callable)
获取到获取到空数据时在put中放一份空的数据。 -
缓存雪崩
:大量的key同时过期。解决:加随机时间
。加上过期时间:spring.cache.redis.time-to-live=3600000
(2)、写模式(缓存与数据库一致)
- 1)
读写加锁
:使用读多写少场景 - 2)`引入Canal:感知到MySQL的更新就去更新缓存
- 3)
读多写多
:直接去数据库查询
总结:
常规数据
(读多写少、即时性、一致性要求不高的数据):完全可以使用Spring-Cache;写模式:只要缓存设置了过期时间就足够了特殊数据
:特殊设计
参考: