概述
今天我们来讲一下如何使用实现分布式锁
要了解如何实现分布式锁。我们首先要了解什么是redis分布式锁
什么是redis分布式锁?
redis分布式锁首先是一个分布式锁。而分布式锁又是什么呢?
简单来说分布式锁是在分布式的环境下,一个方法同一时间只能被一台机器的一个线程所使用。
举一个通俗但是有点不太优雅的例子就是:厕所的隔间一次只能有一个人使用。如果太多人来的话。后面的人只有隔一段时间再来。或者离开。
而redis分布式锁就是使用redis的技术实现分布式锁。
下面我们先来看下redis实现分布式锁的远离。
redis实现分布式锁原理
为了方便讲述,我们就假设我们现在需要对给商品A库存变更进行加锁。防止并发导致库存更新错误。
并假设商品A
加锁:把商品A的ID或者是SKU(后面统称key)这类的唯一数据作为key存入redis中,并且赋予一个值(一般是唯一的,比如时间戳,后面统称value)
拿锁:把key和value进行保存
解锁:当一个已经拿到锁的请求并且执行完修改库存的操作之后,那么这个请求需要使用key获取锁,并检查锁的value是不是和自己拿到锁的value一致如果一致则删除锁
知道了原理之后我们就可以看下redis中有那些方法是可以做到这些操作的了。
按照上面原理我们首先很容易可以想到使用先使用get进行检查值是否存在,然后使用set进行加锁操作,然后使用del进行解锁操作。
能这样想说明年轻人你是有前途的。
但是我们可以想一下,我们使用redis锁是为了什么?是为了让一个方法在同一时间只能被一个线程的所访问 按照我们上面的做法get方法检查和set 方法设置值是两个操作。并不是一起执行的,那么中间就还是有可能出先两个线程同时都获取到锁的现象。所以这种做法还是有点问题的。那么怎么样才是正确的呢?
我们翻开redis的官方文档。(因为本人英语实在捉急,所以使用的是国内翻译过的文档,这里奉上网站redis中国,在这里可以看到很多的redis命令。小伙伴们可以多来看看)
这里set方法新加入了两个命令,分别是px 和 nx 这两个是什么意思呢。我们来看一下。
nx:只有key不存在的时候才会设置key的值
px:设置key的过期时间,单位为毫秒
那么我们知道这两个命令之后,我们就可以组合出这样一个命令
SET SHOP-A-KEY SHOP-A-VALUE NX
这个命令的意思是:当key为‘SHOP-A-KEY’没有值的时候则设置‘SHOP-A-VALUE’为值,并且过期时间为50000毫秒,如果‘SHOP-A-KEY’已经有值了则设置失败。
你们看这样子是不是我们就使用一个命令完成了加锁的操作。完美解决了上面的问题
我们来实际看下效果
结果也是符合我们的预期的。
但是这样加锁就完美了吗?
我们再来想一下,假如一个线程获得了锁,然后在下面的程序执行的过程中出现了异常,没有走到解锁的环节,那会出现什么情况呢?
答案是后面的所有线程都会取不到锁。这也就出现了我们说的死锁了。
那么要怎么解决呢?还记的我们上文的两个命令吗?其中NX我们已经用到了,那么接下来自然就可以使用PX进行解决了。
我们从上面可以知道px是为给值一个时间,时间到了之后自动删除。那么我们就可以在加锁的时候预估一下我们现在执行的程序最长需要多长时间给他设置一个时间。这样即使程序出错了。到了时间之后,后面的线程还是可以正常的获取值。这样就不会出现死锁了。
经过上面的操作我们的加锁是基本上已经可以了。接下来我们来看下解锁。
有的人会说了。解锁不就是删除值不就行了。还需要什么特殊操作吗?
答案是一般是不需要的。
我们设想一种情况。
1、假设有线程a和线程b同时进入一个方法a
2、线程a首先取得了锁,锁的时间是2s,并且进入方法a。
3、此时线程a执行方法a的时候遇到了一些问题执行的时间变长了,但是呢并没有报异常。还是继续执行。并且时间已经超过了2s线程a的锁已经失效了。
4、然后这是线程b这时也获取了锁并且进入了方法a。
5、这时线程a终于执行完了,然后线程a就把锁取消了,但是线程a此时并不知道,他取消的不是自己的锁,而是线程b的锁。这时线程b还在执行,但是锁已经没有了。就出现了问题。
为了防止上面的情况我们就需要在删除的时候先检查一下这个锁是不是本线程的。是再删,不是则不管。对吧。我们不能乱拿别人的东西。
比如:我们可以使用我们加锁的时候可以设置一个唯一的值,比如时间戳。之后解锁的时候先比对一下缓存终的值是不是我们设置的值。
那要怎么做呢?
这里有的人要说了。我们先get出来值然后判断一下,然后再删掉不就行了吗?
答案是不完全对。还记的我们上面说的要保证原子性吗?
这里我们可以采用执行LUA脚本的方式。
至于LUA脚本是什么。这里就不详细讲了(其实我也不知道)
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end
这里其实就是使用一个语句将检查和删除全部解决了。这样就保证了操作的原子性。这里我们再后面的抄作业环节中再看效果。。
代码展示
jedis
jedis是redis官方java使用的链接工具。内部集成了很多命令的方法。我们先看下使用jedis如何实现分布式锁。
jedis连接池单例
/**
* @Author: buding
* @DateTime: 2020/3/6 3:43 下午
*/
public class JedisUtils {
// 地址
public final static String host = "127.0.0.1";
// 端口
public final static Integer port = 6379;
// 密码
public final static String auth = "xx";
private Jedis instance = null;
private static JedisPool jedisPool = null;
private JedisUtils() {
}
public static Jedis getInstance() {
if (jedisPool == null) {
JedisPoolConfig config = new JedisPoolConfig();
// 最大空闲数
config.setMaxIdle(10);
// 总数
config.setMaxTotal(20);
// 等待时间
config.setMaxWaitMillis(50000);
jedisPool = new JedisPool(config, host, port, 50000, auth);
}
return jedisPool.getResource();
}
}
加锁
public static Boolean lock(String key, String value, Long expiredTime) {
Jedis jedis = JedisUtils.getInstance();
String result = jedis.set(key, value, new SetParams().nx().px(expiredTime));
jedis.close();
return null != result ? true : false;
}
解锁
public static Boolean unLock(String key, String value) {
Jedis jedis = JedisUtils.getInstance();
// lua语句,保证原子性
String lua = "if redis.call(\"get\",KEYS[1]) == ARGV[1]\n" +
"then\n" +
" return redis.call(\"del\",KEYS[1])\n" +
"else\n" +
" return 0\n" +
"end";
List<String> keyList = new ArrayList<String>();
keyList.add(key);
List<String> argvList = new ArrayList<String>();
argvList.add(value);
Object result = jedis.eval(lua, keyList, argvList);
return result.equals(1) ? true : false;
}
使用测试
public static void main(String[] args) {
final String key = "shop-a-key";
final String value = "shop-a-value";
Long expiredTime = 30000L;
System.out.println("========加锁========");
Boolean result = RedisUtils.lock(key, value, expiredTime);
System.out.println("========加锁完毕========");
new Thread(new Runnable() {
public void run() {
try {
System.out.println("========执行业务========");
// 模拟业务执行
Thread.sleep(5000);
System.out.println("========业务执行完毕========");
System.out.println("========开始解锁========");
if (RedisUtils.unLock(key, value)) {
System.out.println("========解锁完毕========");
} else {
System.out.println("========解锁失败========");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
结果
使用redisTemplate实现分布式锁
redisTemplate是spring大家族的一部分,提供了在srping应用中通过简单的配置访问redis服务,对reids底层开发包(Jedis, JRedis, and RJC)进行了高度封装,RedisTemplate提供了redis各种操作、异常处理及序列化,支持发布订阅
加锁
@Override
public Boolean lock(String key, String value) {
// 判断是不是可以加锁成功
if (redisTemplate.opsForValue().setIfAbsent(key, value, 300L, TimeUnit.SECONDS)) {
return true;
} else {
return false;
}
}
解锁
@Override
public void unlock(String key, String value) {
String lua = "if redis.call(\"get\",KEYS[1]) == ARGV[1]\n" +
"then\n" +
" return redis.call(\"del\",KEYS[1])\n" +
"else\n" +
" return 0\n" +
"end";
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>(lua,Long.class);
redisScript.setScriptText(lua);
redisTemplate.execute(redisScript, Collections.singletonList(key), value);
}
```
**测试**
```java
@GetMapping("redis")
public Response redisTest() {
final String key = "shop-a-key";
final String value = "shop-a-value";
System.out.println("========加锁========");
if (redisService.lock(key, value)) {
System.out.println("========加锁完毕========");
} else {
System.out.println("========加锁失败========");
}
new Thread(new Runnable() {
public void run() {
try {
System.out.println("========执行业务========");
// 模拟业务执行
Thread.sleep(5000);
System.out.println("========业务执行完毕========");
System.out.println("========开始解锁========");
redisService.unlock(key, value);
System.out.println("========解锁完毕========");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
return Response.success();
}
结果
好了,本文给大家讲了一下redis的分布式锁的大概原理,和使用jedis和redisTemplete实现redis分布式锁的方法。如果有什么问题,希望大家能指出。谢谢🙏