前言
在我们日常开发中,难免会遇到要加锁的情景。例如扣除产品库存,首先要从数据库中取出库存,进行库存判断,再减去库存。这一波操作明显不符合原子性,如果代码块不加锁,很容易因为并发导致超卖问题。咱们的系统如果是单体架构,那我们使用本地锁就可以解决问题。如果是分布式架构,就需要使用分布式锁。
方案
使用 SETNX 和 EXPIRE 命令
if (setnx("item_1_lock", 1)) {
expire("item_1_lock", 30);
try {
... 逻辑
} catch {
...
} finally {
del("item_1_lock");
}
}
这种方法看起来可以解决问题,但是有一定的风险,因为 SETNX
和 EXPIRE
这波操作是非原子性的,如果 SETNX
成功之后,出现错误,导致 EXPIRE
没有执行,导致锁没有设置超时时间形成死锁。
针对这种情况,我们可以使用 lua 脚本来保持操作原子性,保证 SETNX
和 EXPIRE
两个操作要么都成功,要么都不成功。
if (redis.call('setnx', KEYS[1], ARGV[1]) < 1)
then return 0;
end;
redis.call('expire', KEYS[1], tonumber(ARGV[2]));
return 1;
通过这样的方法,我们初步解决了竞争锁的原子性问题,虽然其他功能还未实现,但是应该不会造成死锁 🤪🤪🤪。
Redis 2.6.12 以上可灵活使用 SET 命令
if (set("item_1_lock", 1, "NX", "EX", 30)) {
try {
... 逻辑
} catch {
...
} finally {
del("item_1_lock");
}
}
改进后的方法不需要借助 lua 脚本就解决了 SETNX
和 EXPIRE
的原子性问题。现在我们再仔细琢磨琢磨,如果 A 拿到了锁顺利进入代码块执行逻辑,但是由于各种原因导致超时自动释放锁。在这之后 B 成功拿到了锁进入代码块执行逻辑,但此时如果 A 执行逻辑完毕再来释放锁,就会把 B 刚获得的锁释放了。就好比用自己家的钥匙开了别家的门,这是不可接受的。
为了解决这个问题我们可以尝试在 SET
的时候设置一个锁标识,然后在 DEL
的时候验证当前锁是否为自己的锁。
String value = UUID.randomUUID().toString().replaceAll("-", "");
if (set("item_1_lock", value, "NX", "EX", 30)) {
try {
... 逻辑
} catch {
...
} finally {
... lua 脚本保证原子性
}
}
if (redis.call('get', KEYS[1]) == ARGV[1])
then return redis.call('del', KEYS[1])
else return 0
end
到这里,我们终于解决了竞争锁的原子性问题和误删锁问题。但是锁一般还需要支持可重入、循环等待和超时自动续约等功能点。下面我们学习使用一个非常好用的包来解决这些问题 👏👏👏。
入门 Redisson
Redission 的锁,实现了可重入和超时自动续约功能,它都帮我们封装好了,我们只要按照自己的需求调用它的 API 就可以轻松实现上面所提到的几个功能点。详细功能可以查看 Redisson 文档
在项目中安装 Redisson
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.13.2</version>
</dependency>
implementation 'org.redisson:redisson:3.13.2'
用 Maven 或者 Gradle 构建,目前最新版本为 3.13.2
,也可以在这里 Redisson 找到你需要的版本。
简单尝试
RedissonClient redissonClient = Redisson.create();
RLock lock = redissonClient.getLock("lock");
boolean res = lock.lock();
if (res) {
try {
... 逻辑
} finally {
lock.unlock();
}
}
Redisson 将底层逻辑全部做了一个封装 📦,我们无需关心具体实现,几行代码就能使用一把完美的锁。下面我们简单折腾折腾源码 🤔🤔🤔。
加锁
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
// 获取当前线程 id
long threadId = Thread.currentThread().getId();
// 尝试获取锁
Long ttl = tryAcquire(leaseTime, unit, threadId);
// 获取成功直接返回
if (ttl == null) {
return;
}
// 获取失败,订阅锁对应的频道
RFuture<RedissonLockEntry> future = subscribe(threadId);
if (interruptibly) {
commandExecutor.syncSubscriptionInterrupted(future);
} else {
commandExecutor.syncSubscription(future);
}
try {
while (true) {
// 再次尝试获取锁
ttl = tryAcquire(leaseTime, unit, threadId);
// 获取成功直接返回
if (ttl == null) {
break;
}
// 等待 ttl 时间后继续获取
if (ttl >= 0) {
try {
future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
if (interruptibly) {
throw e;
}
future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
}
} else {
if (interruptibly) {
future.getNow().getLatch().acquire();
} else {
future.getNow().getLatch().acquireUninterruptibly();
}
}
}
} finally {
// 取消频道订阅
unsubscribe(future, threadId);
}
}
获取锁
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
// 如果设置了锁过期时间,则按普通方式获取锁
if (leaseTime != -1) {
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
// 如果没有设置锁过期时间,则开启自动续约功能,先设置 30 秒过期时间
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
// 有错误直接返回
if (e != null) {
return;
}
// 获取锁
if (ttlRemaining == null) {
// 开启自动续约
scheduleExpirationRenewal(threadId);
}
});
return ttlRemainingFuture;
}
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return evalWriteAsync(getName(), LongCodec.INSTANCE, command,
/**
* 锁不存在,使用 hincrby 创建新 hash 表以及给锁计数自增 1,并设置过期时间
* 锁存在并且属于当前线程,给锁计数自增 1,并设置过期时间
* 锁存在但是不属于当前线程,返回锁过期时间
**/
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
删除锁
public RFuture<Void> unlockAsync(long threadId) {
RPromise<Void> result = new RedissonPromise<Void>();
// 解锁逻辑
RFuture<Boolean> future = unlockInnerAsync(threadId);
future.onComplete((opStatus, e) -> {
// 取消刷新过期时间的定时任务
cancelExpirationRenewal(threadId);
if (e != null) {
result.tryFailure(e);
return;
}
// 解锁线程和锁不是同一个线程,抛错
if (opStatus == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + threadId);
result.tryFailure(cause);
return;
}
result.trySuccess(null);
});
return result;
}
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
/**
* 判断锁是否属于当前线程,不属于直接返回
* 锁计数减去 1,如果锁计数还大于 0,则设置过期时间,否则释放锁并发布锁释放消息
**/
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end; " +
"return nil;",
Arrays.asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}
总结
使用 Redis 做分布式锁来解决并发问题仍存在一些困难,也有很多需要注意的点,我们应该正确评估系统的体量,不能为了使用某项技术而用。要完全解决并发问题,仍需要在数据库层面做功夫。🧐🧐🧐