前言
对于分布式锁的问题我也查过很多资料,感觉很多方式实现的并不完善,或者看着云里雾里的,不知所以然,于是就整理了这篇文章,希望对您有用,有写的不对的地方,欢迎留言指正。
首先咱们来聊聊什么是分布式锁,到底解决了什么问题?直接看代码
$stock = $this->getStockFromDb();//查询剩余库存
if ($stock>0){
$this->ReduceStockInDb(); // 在数据库中进行减库存操作
echo "successful";
}else{
echo "库存不足";
}
很简单的一个场景,用户下单,咱们查询商品库存够不够,不够的话直接返回库存不足类似的错误信息,如果库存够的话直接在数据库中库存-1,然后返回成功,在业务逻辑上这段代码是没有什么问题的。
但是,这段代码是存在严重的问题的。
如果库存只剩 1,并且在并发比较高的情况下,比如两个请求同时执行了这段代码,同时查到库存为 1,然后顺利成章的都去数据库执行 stock-1 的操作,这样库存就会变成-1,然后就会引发超卖的现象,刚才说的是两个请求同时执行,如果同时几千个请求打过来,可见造成的损失是非常大的。于是呢有些聪明人就想了个办法,办法如下。
大家都知道 redis 有个 setnx 命令,不知道的话也没关系,我已经帮你查过了
我们把上面的代码优化一下
version-1
$lock_key="lock_key";
$res = $redis->setNx($lock_key, 1);
if (!$res){
return "error_code";
}
$stock = $this->getStockFromDb();//查询剩余库存
if ($stock>0){
$this->ReduceStockInDb(); // 在数据库中进行减库存操作
echo "successful";
}else{
echo "库存不足";
}
$redis->delete($lock_key);
第一次请求进来会去 setNx,当然结果是返回 true,因为 lock_key 不存在,然后下面业务逻辑正常进行,任务执行完了之后把lock_key删除掉,这样下一次请求进来重复上述逻辑
第二次请求进来同样会去执行 setNx,结果返回 false,因为lock_key已经存在,然后直接返回错误信息(你双11抢购秒杀产品的时候给你返回的系统繁忙就是这么来的),不执行库存减 1 的操作
- 有的同学可能有疑惑,咱们不是说高并发的情况下么?要是两个请求同时 setNx 的话获取的结果不都是 true 了,同样会同时去执行业务逻辑,问题不是一样没解决么?但是大家要明白 redis 是单线程的,具备原子性,不同的请求执行 setnx 是顺序执行的,所以这个是不用担心的。
看似问题解决了,其实并不然。
我们这里伪代码写的简单,查询一下库存,然后减1操作而已,但是真实的生产环境中的情况是非常复杂的,在一些极端情况下,程序很可能会报错,崩溃,如果第一次执行加锁了之后程序报错了,那这个锁永远存在,接下来的请求永远也请求不进来了,所以咱们继续优化
version-2
try{ //新加入try catch处理,这样程序万一报错会把锁删除掉
$lock_key="lock_key";
$expire_time = 5;//新加入过期时间,这样锁不会一直占有
$res = $redis->setNx($lock_key, 1, $expire_time);
if (!$res){
return "error_code";
}
$stock = $this->getStockFromDb();//查询剩余库存
if ($stock>0){
$this->ReduceStockInDb(); // 在数据库中进行减库存操作
echo "successful";
}else{
echo "库存不足";
}
}finally {
$redis->delete($lock_key);
}
- 在setnx的时候给加上过期时间,这样至少不会让锁一直存在成为死锁
- 做try catch处理,万一程序抛出异常把锁删掉,也是为了解决死锁问题
这次是把死锁问题解决了,但是问题还是存在,大家可以先想一想还存在什么问题再接着往下看。
存在的问题如下
- 我们的过期时间是5秒钟,万一这个请求执行了6秒钟怎么办?超出的那一秒,跟没有加锁有什么区别?其实不仅仅如此,还有一个更严重的问题存在。比如第二个请求也是执行6秒,那么在第二个请求在超出的那1秒才进来的时候,第一个请求执行完了,当然会删除第二个请求加的锁,如果一直并发都很大的话,锁跟没有加没什么区别。
针对上述问题,最直接的办法是加长过期时间,但是这个不是解决问题的最终办法。把时间设置过长也会产生新的问题,比如各种原因机器崩溃了,需要重启,然后你把锁设置的时间是1年,同时也没有delete掉,难道机器重启了再等一年?另外这样设置固定值的解决方案在计算机当中是不允许的,曾经的“千年虫”问题就是类似的原因导致的
在加超时时间的时候一定要注意一定是一次性加上,保证其原子性,不要先setnx之后,再设置expire_time,这样的话万一在setnx之后那一个瞬间系统挂了,这个锁依然会成为一个永久的死锁
其实上述问题的主要原因在于,请求1会删掉请求2的锁,所以说锁需要保证唯一性。
咱们接着优化
version-3
try{ //新加入try catch处理,这样程序万一报错会把锁删除掉
$lock_key="lock_key";
$expire_time = 5;//新加入过期时间,这样锁不会一直占有
$client_id = session_create_id(); //对每个请求生成唯一性的id
$res = $redis->setNx($lock_key, $client_id, $expire_time);
if (!$res){
return "error_code";
}
$stock = $this->getStockFromDb();//查询剩余库存
if ($stock>0){
$this->ReduceStockInDb(); // 在数据库中进行减库存操作
echo "successful";
}else{
echo "库存不足";
}
}finally {
if ($redis->get($lock_key) == $client_id){ //在这里加一个判断,保证每次删除的锁是当次请求加的锁,这样避免误删了别的请求加的锁
$redis->delete($lock_key);
}
}
我们在每个请求生成了唯一client_id,并且把该值写入了lock_key中
在最后删除锁的时候会先判断这个lock_key是否是该请求生成的,如果不是的话则不会删除
但是上面方案还有问题,我们看最后 redis是先进行了get操作判断,然后再删除,是两步操作,并没有保证其原子性,redis的多步操作可以用lua脚本来保证原子性,其实看到lua也不需要感觉太陌生,他就是一种语言而已,在这里的作用是把多个redis操作打包成一个命令去执行,保证了原子性而已
version-4
try{ //新加入try catch处理,这样程序万一报错会把锁删除掉
$lock_key="lock_key";
$expire_time = 5;//新加入过期时间,这样锁不会一直占有
$client_id = session_create_id(); //对每个请求生成唯一性的id
$res = $redis->setNx($lock_key, $client_id, $expire_time);
if (!$res){
return "error_code";
}
$stock = $this->getStockFromDb();//查询剩余库存
if ($stock>0){
$this->ReduceStockInDb(); // 在数据库中进行减库存操作
echo "successful";
}else{
echo "库存不足";
}
}finally {
$script = ' //此处用lua脚本执行是为了get对比之后再delete的两步操作的原子性
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
else
return 0
end
';
return $instance->eval($script, [$lock_key, $client_id], 1);
}
这样封装之后,分布式锁应该就比较完善了。当然我们还可以进一步的优化一下用户体验
- 现在比如一个请求进来之后,如果请求被锁住,会立即返回给用户请求失败,请重新尝试,我们可以适当的延长一点这个时间,不要立即返回给用户请求失败,这样体验会更好
- 具体方式为用户请求进来如果遇到了锁,可以适当的等待一些时间之后重试,重试的时候如果锁释放了,则这次请求就可以成功
version-5
$retry_times = 3; //重试次数
$usleep_times = 5000;//重试间隔时间
try{ //新加入try catch处理,这样程序万一报错会把锁删除掉
$lock_key="lock_key";
$expire_time = 5;//新加入过期时间,这样锁不会一直占有
while($retry_times > 0){
$client_id = session_create_id(); //对每个请求生成唯一性的id
$res = $redis->setNx($lock_key, $client_id, $expire_time);
if ($res){
break;
}
echo "尝试重新获取锁";
$retry_times--;
usleep($usleep_times);
}
if (!$res){ //重试三次之后都没有获取到锁则给用户返回错误信息
return "error_code";
}
$stock = $this->getStockFromDb();//查询剩余库存
if ($stock>0){
$this->ReduceStockInDb(); // 在数据库中进行减库存操作
echo "successful";
}else{
echo "库存不足";
}
}finally {
$script = ' //此处用lua脚本执行是为了get对比之后再delete的两步操作的原子性
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
else
return 0
end
';
return $instance->eval($script, [$lock_key, $client_id], 1);
}
当然上面的分布式锁还是不够完善的,比如redis主从同步延迟,就会产生问题,像java中redission实现的思想是非常好的,大家感兴趣可以看看源码,今天就聊到这里,感兴趣的朋友可以留言大家一起讨论