本篇文章将对redis的分布式锁进行总结
在我们的日常开发中,如果是分布式架构,那么我们经常会遇到分布式锁的问题,我们对于分布式锁通常有三种实现方式:
- 基于数据库实现分布式锁;
- 基于缓存(Redis等)实现分布式锁;
- 基于Zookeeper实现分布式锁;
这里主要介绍基于redis的分布式锁
这里主要是使用jedis实现,依赖如下:
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
其次还有jedisPool配置类
@Component
@Slf4j
public class JedisConfig {
@Value("${spring.redis.host}")
private String host;
@Value("${spring.redis.port}")
private int port;
@Value("${spring.redis.timeout}")
private int timeout;
@Value("${spring.redis.pool.max-active}")
private int maxActive;
@Value("${spring.redis.pool.max-idle}")
private int maxIdle;
@Value("${spring.redis.pool.min-idle}")
private int minIdle;
@Value("${spring.redis.pool.max-wait}")
private long maxWaitMillis;
@Bean
public JedisPool redisPoolFactory(){
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
jedisPoolConfig.setMaxIdle(maxIdle);
jedisPoolConfig.setMaxWaitMillis(maxWaitMillis);
jedisPoolConfig.setMaxTotal(maxActive);
jedisPoolConfig.setMinIdle(minIdle);
JedisPool jedisPool = new JedisPool(jedisPoolConfig,host,port,timeout,null);
log.info("JedisPool注入成功!");
log.info("redis地址:" + host + ":" + port);
return jedisPool;
}
}
首先最简单的分布式锁实现方式:
- SETNX key value,只在键 key 不存在的情况下,将键 key 的值设置为 value,返回1 。若键 key 已经存在, 则 SETNX 命令不做任何动作,返回0
- EXPIRE key seconds 为给定 key 设置生存时间,当 key 过期时(生存时间为 0 ),它会被自动删除,设置成功返回1,设置失败返回0
- DEL key [key …] 删除key
这里是通过setnx设置键值,并设置过期时间,进行加锁,如果要释放锁则删除对应的key,但是存在一个隐患:
如果在执行完setnx方法时,服务器异常或者关闭,导致expire及del方法不能顺利执行就会导致死锁
针对上述这种请求,我们使用redis提供的另一个方法进行加锁及释放锁
- SET resource_name my_random_value NX PX 30000 NX表示当key不存在时才会执行,Px表示设置过期时间单位毫秒,既上述表示为在key不存在时设置key值为my_random_value(这个值必须是各个客户端之间唯一,比如用户id之类的唯一值,这样是为了后续的释放锁)
- 删除锁时通过lua脚本实现当key以及value值完全一致时进行删除key,这就需要我们的value值各个客户端之间必须唯一,不然就会存在误删的操作。
代码如下:
@Component("redisClient")
@Slf4j
public class RedisClient {
private static final String SET_IS_NOT_EXIST = "NX";
private static final String SET_WITH_EXPIRE_TIME = "PX";
private static final Long RELEASE_SUCCESS = 1L;
@Resource
private JedisPool jedisPool;
/**
* 进行分布式加锁
*
* @param key
* @param requestId
* @param expireTime
* @return
*/
public boolean tryGetDistributedLock(String key, String requestId, int expireTime) {
log.info("redisClient加锁.........{},{}", key, requestId);
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
String result = jedis.set(key, requestId, SET_IS_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);
if (LOCK_SUCCESS.equals(result)) {
return true;
}
} catch (Exception e) {
log.error("{}获取分布式锁出错:{}", requestId, e.getMessage());
} finally {
if(jedis != null){
jedis.close();
}
}
return false;
}
/**
* 释放锁
*
* @param key
* @param requestId
* @return
*/
public boolean releaseDistributedLock(String key, String requestId) {
Jedis jedis = null;
log.info("redisClient释放锁.........{},{}", key, requestId);
try {
//这个lua脚本表示当key和value完全一致时进行删除,并返回1,如果不成功返回0
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
jedis = jedisPool.getResource();
Object result = jedis.eval(script, Collections.singletonList(key), Collections.singletonList(requestId));
if (RELEASE_SUCCESS.equals(result)) {
return true;
}
} catch (Exception e) {
log.error("{}释放分布式锁出错:{}", requestId, e.getMessage());
} finally {
if(jedis != null){
jedis.close();
}
}
return false;
}
}
测试如下:
我们先打开redis命令框,设置一个操作的值
然后再进行模拟6000并发,去减这个值,代码如下
@RestController
@RequestMapping("test")
public class TestController {
@Autowired
private RedisClient redisClient;
@PostMapping("redis")
public void setRedis(){
CountDownLatch countDownLatch = new CountDownLatch(1);
for (int i = 0; i < 6000 ; i++){
new Thread(new Worker(countDownLatch)).start();
}
countDownLatch.countDown();
}
private class Worker implements Runnable {
private CountDownLatch countDownLatch;
public Worker(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
try {
countDownLatch.await();
String nxTest = redisClient.get("nxTest");
if (Integer.parseInt(nxTest) > 0) {
redisClient.decr("nxTest");
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}![在这里插入图片描述](https://img-blog.csdnimg.cn/20200414170607231.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3N3aW1taW5nX2x1,size_16,color_FFFFFF,t_70)
结果如下:
很明显出现了超卖的情况,我们再加上锁,并进行多次测试,代码如下:
@RestController
@RequestMapping("test")
public class TestController {
@Autowired
private RedisClient redisClient;
@PostMapping("redis")
public void setRedis(){
CountDownLatch countDownLatch = new CountDownLatch(1);
for (int i = 0; i < 6000 ; i++){
new Thread(new Worker(countDownLatch,i)).start();
}
countDownLatch.countDown();
}
private class Worker implements Runnable {
private CountDownLatch countDownLatch;
private Integer count;
public Worker(CountDownLatch countDownLatch,int i) {
this.countDownLatch = countDownLatch;
this.count = i;
}
@Override
public void run() {
try {
countDownLatch.await();
if(redisClient.tryGetDistributedLock("testKey5","keyValue"+count,60 * 1000)){
String nxTest = redisClient.get("nxTest");
if (Integer.parseInt(nxTest) > 0) {
redisClient.decr("nxTest");
}
}
} catch (Exception e) {
e.printStackTrace();
}
}finally{
redisClient.releaseDistributedLock("testKey5","keyValue"+count);
}
}
}
结果如下:
很明显看出加锁后多次测试并没有出现超卖的情况,成功顶住并发。
但是这种写法同样存在问题,如果redis是主从的模式,会出现如下情况:
- 客户端A从master获取到锁
- master将锁同步到slave之前,master宕掉了。
- slave节点被晋级为master节点
- 客户端B取得了同一个资源被客户端A已经获取到的另外一个锁。安全失效!
虽然是极端的情况,但确实存在这样的问题