缓存击穿解决方案
解决方案分别有:
- 后台刷新
- 检查更新
- mysql分布式锁
- redis分布式锁
- zookeeper分布式锁
方案一:后台刷新
后台定义一个job(定时任务)专门主动更新缓存数据.比如,一个缓存中的数据过期时间是30分钟,那么job每隔29分钟定时刷新数据(将从数据库中查到的数据更新到缓存中).
这种方案比较容易理解,但会增加系统复杂度。比较适合那些 key 相对固定,cache 粒度较大的业务,key 比较分散的则不太适合,实现起来也比较复杂。
方案二:检查更新
将缓存key的过期时间(绝对时间)一起保存到缓存中(可以拼接,可以添加新字段,可以采用单独的key保存..不管用什么方式,只要两者建立好关联关系就行).在每次执行get操作后,都将get出来的缓存过期时间与当前系统时间做一个对比,如果缓存过期时间-当前系统时间<=1分钟(自定义的一个值),则主动更新缓存.这样就能保证缓存中的数据始终是最新的(和方案一一样,让数据不过期.)
这种方案在特殊情况下也会有问题。假设缓存过期时间是12:00,而 11:59
到 12:00这 1 分钟时间里恰好没有 get 请求过来,又恰好请求都在 11:30 分的时
候高并发过来,那就悲剧了。这种情况比较极端,但并不是没有可能。因为“高
并发”也可能是阶段性在某个时间点爆发。
方案三: 分级缓存
采用 L1 (一级缓存)和 L2(二级缓存) 缓存方式,L1 缓存失效时间短,L2 缓存失效时间长。 请求优先从 L1 缓存获取数据,如果 L1缓存未命中则加锁,只有 1 个线程获取到锁,这个线程再从数据库中读取数据并将数据再更新到到 L1 缓存和 L2 缓存中,而其他线程依旧从 L2 缓存获取数据并返回。
这种方式,主要是通过避免缓存同时失效并结合锁机制实现。所以,当数据更
新时,只能淘汰 L1 缓存,不能同时将 L1 和 L2 中的缓存同时淘汰。L2 缓存中
可能会存在脏数据,需要业务能够容忍这种短时间的不一致。而且,这种方案
可能会造成额外的缓存空间浪费。
方案四:Mysql分布式锁
基于mysql数据库实现分布式锁主要有两种方式:一种是基于数据库表实现的乐观锁和悲观锁,另一种是基于Mysql自带的悲观锁,下面就来一起看一看这几种实现方式及其差异和使用场景.
1.基于数据库表
1.1 悲观锁
Mysql实现分布式悲观锁:直接创建一张锁表,然后通过操作该表中的数据来实现了。当我们要锁住某个方法或资源时,我们就在该表中增加一条记录,想要释放锁的时候就删除这条记录。
创建这样一张数据库表:
CREATE TABLE `methodLock` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键',
`method_name` varchar(64) NOT NULL DEFAULT '' COMMENT '锁定的方法名',
`desc` varchar(1024) NOT NULL DEFAULT '备注信息',
`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '保存数据时间,自动生成',
PRIMARY KEY (`id`),
UNIQUE KEY `uidx_method_name` (`method_name `) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='锁定中的方法';
当我们想要锁住某个方法时,执行以下SQL:
insert into methodLock(method_name,desc) values (‘method_name’,‘desc’)
因为我们对method_name做了唯一性约束,这里如果有多个请求同时提交到数据库的话,数据库会保证只有一个操作可以成功,那么我们就可以认为操作成功的那个线程获得了该方法的锁,可以执行方法体内容。
当方法执行完毕之后,想要释放锁的话,需要执行以下Sql:
delete from methodLock where method_name ='method_name'
上面这种简单的实现有以下几个问题:
①这把锁强依赖数据库的可用性,数据库是一个单点,一旦数据库挂掉,会导致业务系统不可用。
②这把锁没有失效时间,一旦解锁操作失败,就会导致锁记录一直在数据库中,其他线程无法再获得到锁。
③这把锁只能是非阻塞的,因为数据的insert操作,一旦插入失败就会直接报错。没有获得锁的线程并不会进入排队队列,要想再次获得锁就要再次触发获得锁操作。
④这把锁是非重入的,同一个线程在没有释放锁之前无法再次获得该锁。因为数据中数据已经存在了。
针对上面的问题,我们可以对症下药:
①数据库是单点?搞两个数据库,数据之前双向同步。一旦挂掉快速切换到备库上。
②没有失效时间?只要做一个定时任务,每隔一定时间把数据库中的超时数据清理一遍。
③非阻塞的?搞一个while循环,直到insert成功再返回成功。
④非重入的?在数据库表中加个字段,记录当前获得锁的机器的主机信息和线程信息,那么下次再获取锁的时候先查询数据库,如果当前机器的主机信息和线程信息在数据库可以查到的话,直接把锁分配给他就可以了。
但无论如何,Mysql数据库的性能和效率大家心里都有点abcd数的,在高并发的情况下, 用Mysql做分布式锁,无异于是找死...
1.2 乐观锁
大多数是基于数据版本(version)的记录机制实现的.何谓数据版本号?即为数据增加一个版本标识,在基于数据库表的版本解决方案中,一般是通过为数据库表添加一个 “version”字段来实现读取出数据时,将此版本号一同读出,之后更新时,对此版本号加1.在更新过程中,会对版本号进行比较,如果是一致的,没有发生改变,则会成功执行本次操作;如果版本号不一致,则会更新失败.
对乐观锁的含义有了一定的了解后,结合具体的例子,我们来推演下我们应该怎么处理:
假设我们有一张资源表,态(1未分配 2已分配)、资源创建时间、资源更新时间、资源数据版本号.
据进行分配,假设我们现在我们对id=5780这条数那么非分布式场景的情况下,我们一般先查询出来state=1(未分配)的数据,然后从其中选取一条数据可以通过以下语句进行,如果可以更新成功,那么就说明已经占用了这个资源
update t_resource set state=2 where state=1 and id=5780。
如果在分布式场景中,由于数据库的update操作是是原子的,其实上边这条语句理论上也没有问题,但是这条语句如果在典型的“ABA”情况下,我们是无法感知的。有人可能会问什么是“ABA”问题呢?大家可以网上搜索一下,这里我说简单一点就是,如果在你第一次select和第二次update过程中,由于两次操作是非原子的,所以这过程中,如果有一个线程,先是占用了资源(state=2),然后又释放了资源(state=1),实际上最后你执行update操作的时候,是无法知道这个资源发生过变化的。也许你会说这个在你说的场景中应该也还好吧,但是在实际的使用过程中,比如银行账户存款或者扣款的过程中,这种情况是比较恐怖的.
那么如果使用乐观锁我们如何解决上边的问题呢?
a. 先执行select操作查询当前数据的数据版本号,比如当前数据版本号是26:
select id, resource, state,version from t_resource where state=1 and id=5780;
b. 执行更新操作:
update t_resoure set state=2, version=27, update_time=now() where resource=xxxxxx and state=1 and version=26
c.如果上述update语句真正更新影响到了一行数据,那就说明占位成功。如果没有更新影响到一行数据,则说明这个资源已经被别人占位了。
通过上面的讲解,相信大家已经对如何基于数据库表做乐观锁有有了一定的了解了,但是这里还是需要说明一下基于数据库表做乐观锁的一些缺点:
①这种操作方式,使原本一次的update操作,必须变为2次操作: select版本号一次;update一次。增加了数据库操作的次数。
②如果业务场景中的一次业务流程中,多个资源都需要用保证数据一致性,那么如果全部使用基于数据库资源表的乐观锁,就要让每个资源都有一张资源表,这个在实际使用场景中肯定是无法满足的。而且这些都基于数据库操作,在高并发的要求下,对数据库连接的开销一定是无法忍受的。
③乐观锁机制往往基于系统中的数据存储逻辑,因此可能会造成脏数据被更新到数据库中。在系统设计阶段,我们应该充分考虑到这些情况出现的可能性,并进行相应调整,如将乐观锁策略在数据库存储过程中实现,对外只开放基于此存储过程的数据更新途径,而不是将数据库表直接对外公开。
2.基于Mysql自带的悲观锁实现
利用for update加显式的行锁,这样就能利用这个行级的排他锁来实现分布式锁了,同时unlock的时候只要释放commit这个事务,就能达到释放锁的目的。
/**
* 超时获取锁
* @param lockID
* @param timeOuts
* @return
* @throws InterruptedException
*/
public boolean acquireByUpdate(String lockID, long timeOuts) throws InterruptedException, SQLException {
String sql = "SELECT id from test_lock where id = ? for UPDATE ";
long futureTime = System.currentTimeMillis() + timeOuts;
long ranmain = timeOuts;
long timerange = 500;
connection.setAutoCommit(false);
while (true) {
CountDownLatch latch = new CountDownLatch(1);
try {
PreparedStatement statement = connection.prepareStatement(sql);
statement.setString(1, lockID);
statement.setInt(2, 1);
statement.setLong(1, System.currentTimeMillis());
boolean ifsucess = statement.execute();//如果成功,那么就是获取到了锁
if (ifsucess)
return true;
} catch (SQLException e) {
e.printStackTrace();
}
latch.await(timerange, TimeUnit.MILLISECONDS);
ranmain = futureTime - System.currentTimeMillis();
if (ranmain <= 0)
break;
if (ranmain < timerange) {
timerange = ranmain;
}
continue;
}
return false;
}
/**
* 释放锁
* @param lockID
* @return
* @throws SQLException
*/
public void unlockforUpdtate(String lockID) throws SQLException {
connection.commit();
}
- 优点:实现简单
- 缺点:连接池爆满和事务超时的问题单点的问题,单点问题,行锁升级为表锁的问题,并发量大的时候请求量太大、没有线程唤醒机制。
连接池爆满和事务超时的问题单点的问题:利用事务进行加锁的时候,query需要占用数据库连接,在行锁的时候连接不释放,这就会导致连接池爆满。同时由于事务是有超时时间的,过了超时时间自动回滚,会导致锁的释放,这个超时时间要把控好。
适用场景:并发量略高于上面使用乐观锁的情况下,可以采用这种方法.
总结:不论如何,使用Mysql来实现分布式锁都不推荐,其性能,可靠性,以及实现上跟其它两种方式对比均没啥优势,正如我开篇所述,学习Mysql实现分布式锁可以仅仅作为一种了解和思想升华.
方案五:Redis分布式锁
一: Redis 分布式锁原理一
setnx 是『SET if Not eXists』(如果不存在,则 SET)的简写。 命令格式:SETNX key value;使用:只在键 key 不存在的情况下,将键 key 的值设置为 value 。若键 key 已经存在, 则 SETNX 命令不做任何动作。返回值:命令在设置成功时返回 1 ,设置失败时返回 0 。
expire 命令格式:EXPIRE key seconds,使用:为给定 key 设置生存时间,当 key 过期时(生存时间为 0 ),它会被自动删除。返回值:设置成功返回 1 。 当 key 不存在或者不能为 key 设置生存时间时(比如在低于 2.1.3 版本的 Redis 中你尝试更新 key 的生存时间),返回 0 。
del 命令格式:DEL key [key …],使用:删除给定的一个或多个 key ,不存在的 key 会被忽略。返回值:被删除 key 的数量。
过程分析:
1.A尝试去获取锁lockkey,通过setnx(lockkey,currenttime+timeout)命令,对lockkey进行setnx,将value值设置为当前时间+锁超时时间;
2.如果返回值为1,说明redis服务器中还没有lockkey,也就是没有其他用户拥有这个锁,A就能获取锁成功;
3.进行相关业务执行之前,先执行expire(lockkey),对lockkey设置有效期,防止死锁。因为如果不设置有效期的话,lockkey将一直存在于redis中,其他用户尝试获取锁时,执行到setnx(lockkey,currenttime+timeout)时,将不能成功获取到该锁;
4.执行相关业务;
5.释放锁,A完成相关业务之后,要释放拥有的锁,也就是删除redis中该锁的内容,del(lockkey),接下来的用户才能进行重新设置锁新值。
代码实现:
public void redis1() {
log.info("关闭订单定时任务启动");
long lockTimeout = Long.parseLong(PropertiesUtil.getProperty("lock.timeout", "5000"));
//这个方法的缺陷在这里,如果setnx成功后,锁已经存到Redis里面了,服务器异常关闭重启,将不会执行closeOrder,也就不会设置锁的有效期,这样的话锁就不会释放了,就会产生死锁
Long setnxResult = RedisShardedPoolUtil.setnx(Const.REDIS_LOCK.CLOSE_ORDER_TASK_LOCK, String.valueOf(System.currentTimeMillis() + lockTimeout));
if (setnxResult != null && setnxResult.intValue() == 1) {
//如果返回值为1,代表设置成功,获取锁
closeOrder(Const.REDIS_LOCK.CLOSE_ORDER_TASK_LOCK);
} else {
log.info("没有获得分布式锁:{}", Const.REDIS_LOCK.CLOSE_ORDER_TASK_LOCK);
}
log.info("关闭订单定时任务结束");
}
private void closeOrder(String lockName) {
//对锁设置有效期
RedisShardedPoolUtil.expire(lockName, 5);//有效期为5秒,防止死锁
log.info("获取锁:{},ThreadName:{}",Const.REDIS_LOCK.CLOSE_ORDER_TASK_LOCK, Thread.currentThread().getName());
//执行业务
int hour = Integer.parseInt(PropertiesUtil.getProperty("close.order.task.time.hour", "2"));
iOrderService.closeOrder(hour);
//执行完业务后,释放锁
RedisShardedPoolUtil.del(Const.REDIS_LOCK.CLOSE_ORDER_TASK_LOCK);
log.info("释放锁:{},ThreadName:{}",Const.REDIS_LOCK.CLOSE_ORDER_TASK_LOCK, Thread.currentThread().getName());
log.info("=================================");
}
缺陷:
如果A在setnx成功后,A成功获取锁了,也就是锁已经存到Redis里面了,此时服务器异常关闭或是重启,将不会执行closeOrder,也就不会设置锁的有效期,这样的话锁就不会释放了,就会产生死锁。
解决方法:
关闭Tomcat有两种方式,一种通过温柔的执行shutdown关闭,一种通过kill杀死进程关闭
//通过温柔的执行shutdown关闭时,以下的方法会在关闭前执行,即可以释放锁,而对于通过kill杀死进程关闭时,以下方法不会执行,即不会释放锁
//这种方式释放锁的缺点在于,如果关闭的锁过多,将造成关闭服务器耗时过长
@PreDestroy
public void delLock() {
RedisShardedPoolUtil.del(Const.REDIS_LOCK.CLOSE_ORDER_TASK_LOCK);
}
二:Redis 分布式锁原理2(优化版)
为了解决原理1中会出现的死锁问题,提出原理2双重防死锁,可以更好解决死锁问题。
原理图如下:
- getset 命令格式:GETSET key value,将键 key 的值设为 value ,并返回键 key 在被设置之前的旧的value。返回值:如果键 key 没有旧值, 也即是说, 键 key 在被设置之前并不存在, 那么命令返回 nil 。当键 key 存在但不是字符串类型时,命令返回一个错误。
过程分析:
1.当A通过setnx(lockkey,currenttime+timeout)命令能成功设置lockkey时,即返回值为1,过程与原理1一致;
2.A通过setnx(lockkey,currenttime+timeout)命令不能成功设置lockkey时,这是不能直接断定获取锁失败;因为我们在设置锁时,设置了锁的超时时间timeout,当当前时间大于redis中存储键值为lockkey的value值时,可以认为上一任的拥有者对锁的使用权已经失效了,A就可以强行拥有该锁;具体判定过程如下;
3.A通过get(lockkey),获取redis中的存储键值为lockkey的value值,即获取锁的相对时间lockvalueA.
4.lockvalueA!=null && currenttime>lockvalue,A通过当前的时间与锁设置的时间做比较,如果当前时间已经大于锁设置的时间临界,即可以进一步判断是否可以获取锁,否则说明该锁还在被占用,A就还不能获取该锁,结束,获取锁失败;
5.步骤4返回结果为true后,通过getSet设置新的超时时间,并返回旧值lockvalueB,以作判断,因为在分布式环境,在进入这里时可能另外的进程获取到锁并对值进行了修改,只有旧值与返回的值一致才能说明中间未被其他进程获取到这个锁
6.lockvalueB == null || lockvalueA==lockvalueB,判断:若果lockvalueB为null,说明该锁已经被释放了,此时该进程可以获取锁;旧值与返回的lockvalueB一致说明中间未被其他进程获取该锁,可以获取锁;否则不能获取锁,结束,获取锁失败。
代码实现:
public void redis2() {
log.info("关闭订单定时任务启动");
long lockTimeout = Long.parseLong(PropertiesUtil.getProperty("lock.timeout", "5000"));
Long setnxResult = RedisShardedPoolUtil.setnx(Const.REDIS_LOCK.CLOSE_ORDER_TASK_LOCK, String.valueOf(System.currentTimeMillis() + lockTimeout));
if (setnxResult != null && setnxResult.intValue() == 1) {
//如果返回值为1,代表设置成功,获取锁
closeOrder(Const.REDIS_LOCK.CLOSE_ORDER_TASK_LOCK);
} else {
//未获取到锁,继续判断,判断时间戳,看是否可以重置并获取到锁
String lockValueStr = RedisShardedPoolUtil.get(Const.REDIS_LOCK.CLOSE_ORDER_TASK_LOCK);
//通过当前的时间与锁设置的时间做比较,如果当前时间已经大于锁设置的时间临界,即可以进一步判断是否可以获取锁,否则说明该锁还在被占用,不能获取该锁
if (lockValueStr != null && System.currentTimeMillis() > Long.parseLong(lockValueStr)) {
//通过getSet设置新的超时时间,并返回旧值,以作判断,因为在分布式环境,在进入这里时可能另外的进程获取到锁并对值进行了修改,只有旧值与返回的值一致才能说明中间未被其他进程获取到这个锁
String getSetResult = RedisShardedPoolUtil.getSet(Const.REDIS_LOCK.CLOSE_ORDER_TASK_LOCK, String.valueOf(System.currentTimeMillis() + lockTimeout));
//再次用当前时间戳getset。
//返回给定的key的旧值,与旧值判断,是否可以获取锁
//当key没有旧值时,即key不存在时,返回nil ->获取锁
//这里我们set了一个新的value值,获取旧的值。
//若果getSetResult为null,说明该锁已经被释放了,此时该进程可以获取锁;旧值与返回的getSetResult一致说明中间未被其他进程获取该锁,可以获取锁
if (getSetResult == null || (getSetResult != null && StringUtils.equals(lockValueStr, getSetResult))) {
//真正获取到锁
closeOrder(Const.REDIS_LOCK.CLOSE_ORDER_TASK_LOCK);
} else {
log.info("没有获得分布式锁:{}", Const.REDIS_LOCK.CLOSE_ORDER_TASK_LOCK);
}
} else {
log.info("没有获得分布式锁:{}", Const.REDIS_LOCK.CLOSE_ORDER_TASK_LOCK);
}
}
log.info("关闭订单定时任务结束");
}
private void closeOrder(String lockName) {
//对锁设置有效期
RedisShardedPoolUtil.expire(lockName, 5);//有效期为5秒,防止死锁
log.info("获取锁:{},ThreadName:{}",Const.REDIS_LOCK.CLOSE_ORDER_TASK_LOCK, Thread.currentThread().getName());
//执行业务
int hour = Integer.parseInt(PropertiesUtil.getProperty("close.order.task.time.hour", "2"));
iOrderService.closeOrder(hour);
//执行完业务后,释放锁
RedisShardedPoolUtil.del(Const.REDIS_LOCK.CLOSE_ORDER_TASK_LOCK);
log.info("释放锁:{},ThreadName:{}",Const.REDIS_LOCK.CLOSE_ORDER_TASK_LOCK, Thread.currentThread().getName());
log.info("=================================");
}
优化点:
加入了超时时间判断锁是否超时了,及时A在成功设置了锁之后,服务器就立即出现宕机或是重启,也不会出现死锁问题;因为B在尝试获取锁的时候,如果不能setnx成功,会去获取redis中锁的超时时间与当前的系统时间做比较,如果当前的系统时间已经大于锁超时时间,说明A已经对锁的使用权失效,B能继续判断能否获取锁,解决了redis分布式锁的死锁问题。
三:Redis Lua脚本实现分布式锁
主要思路:
通过 set key value px milliseconds nx 命令实现加锁, 通过Lua脚本实现解锁。核心实现命令如下:
这种实现方式主要有以下几个要点:
- set 命令要用 set key value px milliseconds nx,替代 setnx + expire 需要分两次执行命令的方式,保证了原子性,
- value 要具有唯一性,可以使用UUID.randomUUID().toString()方法生成,用来标识这把锁是属于哪个请求加的,在解锁的时候就可以有依据;
- 释放锁时要验证 value 值,防止误解锁;
- 通过 Lua 脚本来避免 Check And Set 模型的并发问题,因为在释放锁的时候因为涉及到多个Redis操作 (利用了eval命令执行Lua脚本的原子性);
加锁代码分析
首先,set()加入了NX参数,可以保证如果已有key存在,则函数不会调用成功,也就是只有一个客户端能持有锁,满足互斥性。其次,由于我们对锁设置了过期时间,即使锁的持有者后续发生崩溃而没有解锁,锁也会因为到了过期时间而自动解锁(即key被删除),不会发生死锁。最后,因为我们将value赋值为requestId,用来标识这把锁是属于哪个请求加的,那么在客户端在解锁的时候就可以进行校验是否是同一个客户端。
解锁代码分析
将Lua代码传到jedis.eval()方法里,并使参数KEYS[1]赋值为lockKey,ARGV[1]赋值为requestId。在执行的时候,首先会获取锁对应的value值,检查是否与requestId相等,如果相等则解锁(删除key)。
这种方式仍存在单点风险
以上实现在 Redis 正常运行情况下是没问题的,但如果存储锁对应key的那个节点挂了的话,就可能存在丢失锁的风险,导致出现多个客户端持有锁的情况,这样就不能实现资源的独享了。
所以在这种实现之下,不论Redis的部署架构是单机模式、主从模式、哨兵模式还是集群模式,都存在这种风险。因为Redis的主从同步是异步的。 运行的是,Redis 之父 antirez 提出了 redlock算法 可以解决这个问题。
四、 Redisson 实现分布式可重入锁及源码分析 (RedissonLock)
高效分布式锁
当我们在设计分布式锁的时候,我们应该考虑分布式锁至少要满足的一些条件,同时考虑如何高效的设计分布式锁,这里我认为以下几点是必须要考虑的。
1、互斥
在分布式高并发的条件下,我们最需要保证,同一时刻只能有一个线程获得锁,这是最基本的一点。2、防止死锁
在分布式高并发的条件下,比如有个线程获得锁的同时,还没有来得及去释放锁,就因为系统故障或者其它原因使它无法执行释放锁的命令,导致其它线程都无法获得锁,造成死锁。
所以分布式非常有必要设置锁的有效时间,确保系统出现故障后,在一定时间内能够主动去释放锁,避免造成死锁的情况。
3、性能
对于访问量大的共享资源,需要考虑减少锁等待的时间,避免导致大量线程阻塞。所以在锁的设计时,需要考虑两点。
- 锁的颗粒度要尽量小。比如你要通过锁来减库存,那这个锁的名称你可以设置成是商品的ID,而不是任取名称。这样这个锁只对当前商品有效,锁的颗粒度小。
2.锁的范围尽量要小。比如只要锁2行代码就可以解决问题的,那就不要去锁10行代码了。
4、重入
我们知道ReentrantLock是可重入锁,那它的特点就是:同一个线程可以重复拿到同一个资源的锁。重入锁非常有利于资源的高效利用。关于这点之后会做演示。针对以上Redisson都能很好的满足,下面就来分析下它。
Redisson原理分析
1、加锁机制
线程去获取锁,获取成功: 执行lua脚本,保存数据到redis数据库。
线程去获取锁,获取失败: 一直通过while循环尝试获取锁,获取成功后,执行lua脚本,保存数据到redis数据库。2、watch dog自动延期机制
这个比较难理解,找了些许资料感觉也并没有解释的很清楚。这里我自己的理解就是:
在一个分布式环境下,假如一个线程获得锁后,突然服务器宕机了,那么这个时候在一定时间后这个锁会自动释放,你也可以设置锁的有效时间(不设置默认30秒),这样的目的主要是防止死锁的发生。
但在实际开发中会有下面一种情况:
//设置锁1秒过去
redissonLock.lock("redisson", 1);
/**
* 业务逻辑需要咨询2秒
*/
redissonLock.release("redisson");
/**
* 线程1 进来获得锁后,线程一切正常并没有宕机,但它的业务逻辑需要执行2秒,这就会有个问题,在 线程1 执行1秒后,这个锁就自动过期了,
* 那么这个时候 线程2 进来了。那么就存在 线程1和线程2 同时在这段业务逻辑里执行代码,这当然是不合理的。
* 而且如果是这种情况,那么在解锁时系统会抛异常,因为解锁和加锁已经不是同一线程了,具体后面代码演示。
*/
所以这个时候看门狗就出现了,它的作用就是 线程1 业务还没有执行完,时间就过了,线程1 还想持有锁的话,就会启动一个watch dog后台线程,不断的延长锁key的生存时间。
注意 正常这个看门狗线程是不启动的,还有就是这个看门狗启动后对整体性能也会有一定影响,所以不建议开启看门狗。
- 3、为啥要用lua脚本呢?
这个不用多说,主要是如果你的业务逻辑复杂的话,通过封装在lua脚本中发送给redis,而且redis是单线程的,这样就保证这段复杂业务逻辑执行的原子性。
- 4、可重入加锁机制
1、Redis存储锁的数据类型是 Hash类型
2、Hash数据类型的key值包含了当前线程信息。
下面是redis存储的数据:
这里表面数据类型是Hash类型,Hash类型相当于我们java的 <key,<key1,value>> 类型,这里key是指 'redisson'
它的有效期还有9秒,我们再来看里们的key1值为078e44a3-5f95-4e24-b6aa-80684655a15a:45它的组成是:
guid + 当前线程的ID。后面的value是就和可重入加锁有关。
举图说明
上面这图的意思就是可重入锁的机制,它最大的优点就是相同线程不需要在等待锁,而是可以直接进行相应操作。
- 5、Redis分布式锁的缺点
Redis分布式锁会有个缺陷,就是在Redis哨兵模式下:
客户端1 对某个master节点写入了redisson锁,此时会异步复制给对应的 slave节点。但是这个过程中一旦发生 master节点宕机,主备切换,slave节点从变为了 master节点。
这时客户端2 来尝试加锁的时候,在新的master节点上也能加锁,此时就会导致多个客户端对同一个分布式锁完成了加锁。
这时系统在业务语义上一定会出现问题,导致各种脏数据的产生。
缺陷在哨兵模式或者主从模式下,如果 master实例宕机的时候,可能导致多个客户端同时完成加锁。
Redisson 分布式重入锁用法
加锁源码分析
1.通过 getLock 方法获取对象
2.通过tryLock方法尝试获取锁
其中 tryAcquire 内部通过调用 tryLockInnerAsync 实现申请锁的逻辑。申请锁并返回锁有效期还剩余的时间,如果为空说明锁未被其它线程申请则直接获取并返回,如果获取到时间,则进入等待竞争逻辑。
加锁流程图:
五、用 Redisson 实现分布式锁(红锁 RedissonRedLock)
Redis 官网对 redLock 算法的介绍大致如下:
在分布式版本的算法里我们假设我们有N个Redis master节点,这些节点都是完全独立的,我们不用任何复制或者其他隐含的分布式协调机制。之前我们已经描述了在Redis单实例下怎么安全地获取和释放锁。我们确保将在每(N)个实例上使用此方法获取和释放锁。在我们的例子里面我们把N设成5,这是一个比较合理的设置,所以我们需要在5台机器上面或者5台虚拟机上面运行这些实例,这样保证他们不会同时都宕掉。为了取到锁,客户端应该执行以下操作:
1.获取当前Unix时间,以毫秒为单位。
2.依次尝试从5个实例,使用相同的key和具有唯一性的value(例如UUID)获取锁。当向Redis请求获取锁时,客户端应该设置一个尝试从某个Reids实例获取锁的最大等待时间(超过这个时间,则立马询问下一个实例),这个超时时间应该小于锁的失效时间。例如你的锁自动失效时间为10秒,则超时时间应该在5-50毫秒之间。这样可以避免服务器端Redis已经挂掉的情况下,客户端还在死死地等待响应结果。如果服务器端没有在规定时间内响应,客户端应该尽快尝试去另外一个Redis实例请求获取锁。
3.户端使用当前时间减去开始获取锁时间(步骤1记录的时间)就得到获取锁消耗的时间。当且仅当从大多数(N/2+1,这里是3个节点)的Redis节点都取到锁,并且使用的总耗时小于锁失效时间时,锁才算获取成功。
4.如果取到了锁,key的真正有效时间 = 有效时间(获取锁时设置的key的自动超时时间) – 获取锁的总耗时(询问各个Redis实例的总耗时之和)(步骤3计算的结果)。
5.如果因为某些原因,最终获取锁失败(即没有在至少 “N/2+1 ”个Redis实例取到锁或者“获取锁的总耗时”超过了“有效时间”),客户端应该在所有的Redis实例上进行解锁(即便某些Redis实例根本就没有加锁成功,这样可以防止某些节点获取到锁但是客户端没有得到响应而导致接下来的一段时间不能被重新获取锁)。
最核心的变化就是需要构建多个 RLock ,然后根据多个 RLock 构建成一个 RedissonRedLock,因为 redLock 算法是建立在多个互相独立的 Redis 环境之上的(为了区分可以叫为 Redission node),Redission node 节点既可以是单机模式(single),也可以是主从模式(master/salve),哨兵模式(sentinal),或者集群模式(cluster)。这就意味着,不能跟以往这样只搭建 1个 cluster、或 1个 sentinel 集群,或是1套主从架构就了事了,需要为 RedissonRedLock 额外搭建多几套独立的 Redission 节点。 比如可以搭建3个 或者5个 Redission节点,具体可看视资源及业务情况而定。
下图是一个利用多个 Redission node 最终 组成 RedLock分布式锁的例子,需要特别注意的是每个 Redission node 是互相独立的,不存在任何复制或者其他隐含的分布式协调机制。
原文
https://www.jianshu.com/p/d00348a9eb3b
https://blog.csdn.net/lovexiaotaozi/article/details/83819916