分布式锁介绍
分布式并发环境下,为了保证事务操作的原子性,需要引入分布式锁来保证一连串行为是原子性操作
例如经典的自增1操作
value = get(a);
set(a, value+1);
假设当前有两个线程要执行上面的操作,要保证线程a在保存新值之前,线程b不会读取到旧的值进行加一操作,一种实现方案就是引入分布式锁,保证该资源全局仅被一个线程占有
分布式锁设计
分布式锁最重要的操作就是加锁与解锁,除此以外还需要设置锁的过期时间,防止死锁问题的发生
加锁与解锁需要明确锁的是什么,对于同一类业务操作同一个资源对象,有且只有一个标识符对应到该资源对象,反之亦然。即锁定该资源对象的标识符需要是全局唯一的,在redis中就是锁对应的key的设计需要全局唯一
分布式锁的实现比较
Github: https://github.com/ChaselX/devbox-spring-boot-starter
Gitee: https://gitee.com/chasel96/devbox-spring-boot-starter
实现方案一 基于redis超时机制的分布式锁实现
最简单的加锁实现就是往redis中放入一个key,通过redis的setIfAbsent()
方法,若插入失败(该key对应的记录已经存在)则代表当前对应的资源已经被其他线程锁住了,休眠一段时间后再次尝试获取锁
而解锁就是从redis删除该key对应的记录
为了防止线程长时间占有锁不释放导致死锁,在加锁的时候设置该缓存的过期时间为n秒,当n秒过去,锁自动释放
实现代码:
@Component
public class DistributedLockCacheImpl implements DistributedLockCache {
private static Logger logger = LoggerFactory.getLogger(DistributedLockCacheImpl.class);
@Autowired
private StringRedisTemplate stringRedisTemplate;
private final static int EXPIRE_MS = 15 * 1000;
private final static int TIME_OUT_MS = 10 * 1000;
private final static String PREFIX_LOCK = "demo:distributed-lock:";
public void lock(String id) {
String key = PREFIX_LOCK + id;
int delay = 100;
int timeout = TIME_OUT_MS;
try {
while (timeout >= 0) {
Boolean result = redisTemplateString.opsForValue().setIfAbsent(key, "", EXPIRE_MS, TimeUnit.MILLISECONDS);
if (result) {
return;
}
timeout -= delay;
Thread.sleep(delay);
}
throw new DistributedLockException();
}catch (Exception ex){
throw new DistributedLockException();
}
}
public void release(String id) {
String key = PREFIX_LOCK + id;
stringRedisTemplate.delete(key);
}
}
这种实现有个问题:
- 若线程阻塞时间大于锁的超时时间,直接删除key会出现问题,有可能锁已经自动释放了,而此时删除的可能是其他线程的锁
优化实现方案一
引入随机值的概念,存入redis的value改为随机值,在释放锁的时候判断当前锁的value是否和随机值一致,若一致才进行删除操作,这需要引入redis事务,事务的使用参见深入理解redis事务
public boolean acquire(String id, String randomStr) {
int delay = 100;
int timeout = TIME_OUT_MS;
randomStr = UUID.randomUUID().toString();
try {
while (timeout >= 0) {
Boolean result = redisTemplate.opsForValue().setIfAbsent(lockKey, randomStr, expireMsecs, TimeUnit.MILLISECONDS);
if (result != null && result) {
return true;
}
timeout -= delay;
Thread.sleep(delay);
}
throw new DistributedLockException();
} catch (Exception e) {
throw new DistributedLockException();
}
}
public void release(String id, String randomStr) {
String key = PREFIX_LOCK + id;
stringRedisTemplate.watch(key);
String currentValue = stringRedisTemplate.opsForValue().get(key);
if (null==currentValue || !currentValue.equals(randomStr)) {
return;
}
stringRedisTemplate.multi();
stringRedisTemplate.delete(key);
stringRedisTemplate.exec();
}
释放锁这部分事务也可以使用redis脚本代替,Redis脚本也是事务型的。因此,可以通过Redis事务实现的功能,同样也可以通过Redis脚本来实现,而且通常脚本更简单、更快速。
实现方案二 锁超时时间约等于过期时间的分布式锁解决方案
这种实现不依赖于redis超时机制,也不用担心因为键未删除而带来的死锁问题。但由于其实现依赖于系统当前时间,需要保证服务器之间的系统时钟同步问题(linux可参考ntp时钟同步)。一般生产环境机器是默认需要做时钟同步的。
之所以说锁超时时间约等于expireMsecs是因为该方案无法严格保证锁的过期时间为expireMsecs
秒,因为在获取锁的时候,过期时间可能会在多线程并发getAndSet()
的时候被修改,导致过期时间和当前线程加锁时候放入的值不等同,但个人认为影响不大,实现代码如下:
package com.chasel.demo.cache;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.StringRedisTemplate;
/**
* 锁超时时间约等于expireMsecs的分布式锁解决方案
*
* @author XieLongzhen
* @date 2019/3/22 10:31
*/
@Slf4j
public class RedisDistributedLock implements IDistributedLock {
private StringRedisTemplate redisTemplate;
/**
* 锁的键值
*/
private String lockKey;
/**
* 锁超时, 防止线程得到锁之后, 不去释放锁
*/
private int expireMsecs;
/**
* 锁等待, 防止线程饥饿
*/
private int timeoutMsecs;
/**
* 是否已经获取锁
*/
private boolean locked = false;
public RedisDistributedLock(String lockKey, int timeoutMsecs, int expireMsecs, StringRedisTemplate redisTemplate) {
this.lockKey = lockKey;
this.timeoutMsecs = timeoutMsecs;
this.expireMsecs = expireMsecs;
this.redisTemplate = redisTemplate;
}
public String getLockKey() {
return this.lockKey;
}
/**
* 方法去掉了synchronized关键字
*/
@Override
public boolean acquire() {
int timeout = timeoutMsecs;
try {
while (timeout >= 0) {
long expires = System.currentTimeMillis() + expireMsecs + 1;
String expiresStr = String.valueOf(expires); // 锁到期时间
if (redisTemplate.opsForValue().setIfAbsent(lockKey, expiresStr)) {
locked = true;
log.debug("[1] 成功获取分布式锁!");
return true;
}
String currentValueStr = redisTemplate.opsForValue().get(lockKey); // redis里的时间
// 判断是否为空, redis旧锁是否已经过期, 如果被其他线程设置了值, 则第二个条件判断是过不去的
if (currentValueStr != null && Long.parseLong(currentValueStr) < System.currentTimeMillis()) {
String oldValueStr = redisTemplate.opsForValue().getAndSet(lockKey, expiresStr);
// 获取上一个锁到期时间, 并设置现在的锁到期时间
// 如果这个时候, 多个线程恰好都到了这里
// 只有一个线程拿到的过期时间是小于当前时间的,后续的线程set进去过期时间但拿到的过期时间会大于当前时间
// 只有一个线程的设置值和当前值相同, 那么它才有权利获取锁,其余线程继续等待
if (oldValueStr == null || oldValueStr.equals(currentValueStr)) {
locked = true;
log.debug("[2] 成功获取分布式锁!");
return true;
}
}
timeout -= 100;
Thread.sleep(100);
}
} catch (Exception e) {
log.error("获取锁出现异常, 必须释放: {}", e.getMessage());
}
return false;
}
/**
* 方法去掉了synchronized关键字
*/
@Override
public void release() {
try {
if (locked) {
String currentValueStr = redisTemplate.opsForValue().get(lockKey); // redis里的时间
// 校验是否超过有效期, 如果不在有效期内, 那说明当前锁已经失效, 不能进行删除锁操作
if (currentValueStr != null && Long.parseLong(currentValueStr) > System.currentTimeMillis()) {
redisTemplate.delete(lockKey);
locked = false;
log.debug("[3] 成功释放分布式锁!");
}
}
} catch (Exception e) {
log.error("释放锁出现异常, 必须释放: {}", e.getMessage());
}
}
}
使用通过DistributedLockComponent
来使用,在业务代码中通过DistributedLockComponent
获取锁以后只需要简单的acquire()
然后release()
即可
package com.chasel.demo.cache;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
/**
* 分布式锁工具类
*
* @author XieLongzhen
* @date 2019/3/21 10:36
*/
@Component
public class DistributedLockComponent {
@Autowired
private StringRedisTemplate redisTemplate;
private final String PREFIX_KEY = “demo:distributed-lock:”;
/**
* 锁等待, 防止线程饥饿
*/
private final int DEFAULT_TIMEOUT_MSECS = 15 * 1000;
/**
* 锁超时, 防止线程得到锁之后, 不去释放锁
*/
private final int DEFAULT_EXPIRE_MSECS = 15 * 1000;
/**
* 获取分布式锁
* 默认获取锁15s超时, 锁过期时间15s
*/
public IDistributedLock getRedisLock(String key) {
return getRedisLock(key, DEFAULT_TIMEOUT_MSECS, DEFAULT_EXPIRE_MSECS);
}
/**
* 获取分布式锁
*/
public IDistributedLock getRedisLock(String key, int timeoutMsecs) {
return getRedisLock(key, timeoutMsecs, DEFAULT_EXPIRE_MSECS);
}
/**
* 获取分布式锁
*/
public IDistributedLock getRedisLock(String key, int timeoutMsecs, int expireMsecs) {
return new RedisDistributedLock(assembleKey(key), timeoutMsecs, expireMsecs, redisTemplate);
}
/**
* 对 lockKey 进行拼接装配
*
* @param key 系统内保证该lockKey唯一即可
*/
private String assembleKey(String key) {
return String.format("%s%s", PREFIX_KEY, key);
}
}