典型缓存案例
当我们使用redis做缓存时一般步骤如下
- 请求进来时候首先查询redis判断是否存在缓存且缓存是否过期
- 若已经存在不过期的缓存则直接获取返回
- 若缓存不存在或已过期则重新查询数据库并将该数据存到redis中
代码可以如下表示:
@Autowired
private RedisTemplate redisTemplate;
public List<String> getValueBySql(String key){
System.out.println("这里模拟从数据库中获取数据");
return new ArrayList<>();
}
public List<String> getCache(String key){
List<String> resultList = (List<String>)redisTemplate.opsForValue().get(key);
if(resultList == null || CollectionUtils.isEmpty(resultList)){
//若缓存不存在则从数据库获取并设置时间
resultList = getValueBySql(key);
redisTemplate.opsForValue().set(key, resultList, 1000, TimeUnit.SECONDS);
return resultList;
}else{
return resultList;
}
}
缓存击穿
什么是缓存击穿?
如上面的经典缓存流程,在整个流程中我们需要先查询redis,在redis没有的时候再去查数据库最后再将数据库返回的数据存到redis中。如果有一些非常经常被访问的数据,例如一分钟内有超高的访问请求。试想一下刚某个热点数据key在这个时刻过期。下一时刻有好几个请求同时来请求key,这时候由于redisTemplate.opsForValue().get(key)为空,所有的数据必将直接访问数据库,这个时候大并发的请求可能会瞬间把后端DB压垮
解决方案1: 使用synchronized+双检查机制
此方法适用于单机模式
/***
* synchronized + 双重检查机制
* @param key
* @return
*/
public List<String> getCacheSave(String key){
List<String> resultList = (List<String>)redisTemplate.opsForValue().get(key);
if(resultList == null || CollectionUtils.isEmpty(resultList)){
//采用synchronized保证一次只有一个请求进入到这个代码块
synchronized (this){
resultList = (List<String>)redisTemplate.opsForValue().get(key);
if(CollectionUtils.isEmpty(resultList)){
return resultList;
}
resultList = getValueBySql(key);
redisTemplate.opsForValue().set(key, resultList, 1000, TimeUnit.SECONDS);
return resultList;
}
}else{
return resultList;
}
}
- 上面代码第一个判断保证在缓存有数据时,让查询缓存的请求不必排队,减小了同步的粒度
- synchronized (this)保证查询数据库是同步操作,同一时刻只能有一个请求查询数据库
- 第二个判断保证所有在redis有缓存时,其他请求无需在查意思数据库。若没有这个判断,其他已经等待synchronized 解锁的请求会在请求一次数据库
解决方案2:采用互斥锁
适用于分布式模式
使用分布式锁的方式。如图,使用分布式锁保证只有一个线程查询数据库,其他线程采用重试的方式进行获取
代码参考如下
/***
*
* @param key
* @param retryCount 重试次数
* @return
* @throws InterruptedException
*/
public List<String> getCacheSave2(String key,int retryCount) throws InterruptedException {
List<String> resultList = (List<String>)redisTemplate.opsForValue().get(key);
if(CollectionUtils.isEmpty(resultList)){
final String mutexKey = key + "_lock";
boolean isLock = (Boolean) redisTemplate.execute(new RedisCallback() {
@Override
public Object doInRedis(RedisConnection connection) throws DataAccessException {
//只在键key不存在的情况下,将键key的值设置为value,若键key已经存在,则 SETNX 命令不做任何动作
//命令在设置成功时返回 1 , 设置失败时返回 0
return connection.setNX(mutexKey.getBytes(),"1".getBytes());
}
});
if(isLock){
//设置成1秒过期
redisTemplate.expire(mutexKey, 1000, TimeUnit.MILLISECONDS);
resultList = getValueBySql(key);
redisTemplate.opsForValue().set(key, resultList, 1000, TimeUnit.SECONDS);
redisTemplate.delete(mutexKey);
}else{
//线程休息50毫秒后重试
Thread.sleep(50);
retryCount--;
System.out.println("=====进行重试,当前次数:" + retryCount);
if(retryCount == 0){
System.out.println("====这里发邮件或者记录下获取不到数据的日志,并为key设置一个空置防止重复获取");
List<String> list = Lists.newArrayList("no find");
redisTemplate.opsForValue().set(key, list, 1000, TimeUnit.SECONDS);
return list;
}
return getCacheSave2(key,retryCount);
}
}
return resultList;
}
解决方案3:提前设置锁
这是网上看到的方案
https://carlosfu.iteye.com/blog/2269687
感觉还是采用分布式锁的方式,只不过是每次获取的时候先获取一下key的过期时间,如果过期时间快到了就提前重新设置下超时时间,并从数据库中获取最新的数据覆盖
解决方案:资源保护
采用netflix的hystrix,可以做资源的隔离保护主线程池(不懂,后面学习下)
缓存雪崩
什么是缓存雪崩?
缓存雪崩是指在我们设置缓存时采用了相同的过期时间,导致缓存在某一时刻同时失效,请求全部转发到DB,DB瞬时压力过重雪崩。
解决方案:在设置过期时间时加随机值保证不同时失效
缓存失效时的雪崩效应对底层系统的冲击非常可怕。大多数系统设计者考虑用加锁或者队列的方式保证缓存的单线程(进程)写,从而避免失效时大量的并发请求落到底层存储系统上。这里分享一个简单方案就时讲缓存失效时间分散开,比如我们可以在原有的失效时间基础上增加一个随机值,比如1-5分钟随机,这样每一个缓存的过期时间的重复率就会降低,就很难引发集体失效的事件
缓存击穿
例如上面的经典流程,如果我输入一个不在我们规划范围的key,也就是说这个key永远也查不到数据,则按照流程每次都要先去查数据库,要是有人利用不存在的key频繁攻击我们的应用,这就是漏洞。
解决方案1:设置白名单
设置key的白名单,只有在白名单的key才能允许查询(如果key的数量很多或key不是事先知道的情况下这种方式就不太好用)。或者更高级点用布隆过滤器记录所有可能的key,每次请求时进行拦截