分布式锁的实现种类
Redisson分布式锁的实现
代码实现
// 1.构造redisson实现分布式锁必要的Config
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379").setPassword("123456").setDatabase(0);
// 2.构造RedissonClient
RedissonClient redissonClient = Redisson.create(config);
// 3.获取锁对象实例(无法保证是按线程的顺序获取到)
RLock rLock = redissonClient.getLock(lockKey);
try {
/**
* 4.尝试获取锁
* waitTimeout 尝试获取锁的最大等待时间,超过这个值,则认为获取锁失败
* leaseTime 锁的持有时间,超过这个时间锁会自动失效(值应设置为大于业务处理的时间,确保在锁有效期内业务能处理完)
*/
boolean res = rLock.tryLock((long)waitTimeout, (long)leaseTime, TimeUnit.SECONDS);
if (res) {
//成功获得锁,在这里处理业务
}
} catch (Exception e) {
throw new RuntimeException("aquire lock fail");
}finally{
//无论如何, 最后都要解锁
rLock.unlock();
}
加锁&解锁Lua脚本
Lua脚本加入的优点:
1.减少网络开销:本来5次网络请求的操作,可以用一个请求完成,原先5次请求的逻辑放在redis服务器上完成。使用脚本,减少了网络往返时延。
2.原子操作:Redis会将整个脚本作为一个整体执行,中间不会被其他命令插入,保证了原子性执行。
3.复用:客户端发送的脚本会永久存储在Redis中,意味着其他客户端可以复用这一脚本而不需要使用代码完成同样的逻辑。
加锁Lua脚本解析
//判断当前的key是否存在,C语言中非0即真
// 为0代表当前值不存在,设置key ,属性 , 值
if (redis.call('exists', KEYS[1]) == 0) then
redis.call('hset', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
//如果存在,且唯一标识属性也匹配,则表示当前加锁为锁的重入操作,
//对锁重入+1,并重新设置锁的过期时间
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
redis.call('hincrby', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
// 如果锁存在,唯一标识不匹配,表示其他的线程正在占用,当前线程无权获取资源,直接返回锁的过期时间
return redis.call('pttl', KEYS[1]);
源码解析-加锁
RedissonLock.java
//加锁方法
public void lock() {
try {
lockInterruptibly();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
public void lockInterruptibly() throws InterruptedException {
//参数:1: 过期时间 2: 过期时间单位
lockInterruptibly(-1, null);
}
//真正去执行获取所 释放所的函数
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
//获取当前的线程ID
long threadId = Thread.currentThread().getId();
//去尝试获取锁,返回值是过期时间
// 【核心点1】尝试获取锁,若返回值为null,则表示已获取到锁
Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return;
}
// 获取到锁的线程会发布一个消息订阅,用于去唤醒其他等待获取锁的线程
// 【核心点2】订阅解锁消息,见org.redisson.pubsub.LockPubSub#onMessage
/**
* 4.订阅锁释放事件,并通过await方法阻塞等待锁释放,有效的解决了无效的锁申请浪费资源的问题:
* 基于信息量,当锁被其它资源占用时,当前线程通过 Redis 的 channel 订阅锁的释放事件,一旦锁释放会发消息通知待等待的线程进行竞争
* 当 this.await返回false,说明等待时间已经超出获取锁最大等待时间,取消订阅并返回获取锁失败
* 当 this.await返回true,进入循环尝试获取锁
*/
final RFuture<RedissonLockEntry> subscribeFuture =
RFuture<RedissonLockEntry> future = subscribe(threadId);
commandExecutor.syncSubscription(future);
// 订阅成功
try {
while (true) {
//重新去获取锁
ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
break;
}
// 【核心点3】根据锁TTL,调整阻塞等待时长;
// 注意:这里实现非常巧妙,1、latch其实是个信号量Semaphore,调用其tryAcquire方法会让当前线程阻塞一段时间,避免了在while循环中频繁请求获取锁;
//2、该Semaphore的release方法,会在订阅解锁消息的监听器消息处理方法org.redisson.pubsub.LockPubSub#onMessage调用;当其他线程释放了占用的锁,会广播解锁消息,监听器接收解锁消息,并释放信号量,最终会唤醒阻塞在这里的线程。
// waiting for message
if (ttl >= 0) {
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
getEntry(threadId).getLatch().acquire();
}
}
} finally {
unsubscribe(future, threadId);
}
// get(lockAsync(leaseTime, unit));
}
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
return get(tryAcquireAsync(leaseTime, unit, threadId));
}
//尝试获取锁,返回一个RFuture 对象
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
//
if (leaseTime != -1) {
// 实质是异步执行加锁Lua脚本
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
// 实质是异步执行加锁Lua脚本
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
//发起个异步的监听事件,用于异步去获取ttlRemainingFuture 的返回状态
ttlRemainingFuture.addListener(new FutureListener<Long>() {
@Override
public void operationComplete(Future<Long> future) throws Exception {
//先判断这个异步操作有没有执行成功,如果没有成功,直接返回,如果执行成功了,就会同步获取结果
if (!future.isSuccess()) {
return;
}
Long ttlRemaining = future.getNow();
如果ttlRemaining为null,则会执行一个定时调度的方法scheduleExpirationRenewal
// lock acquired
if (ttlRemaining == null) {
scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
//执行续租的 定时任务 定时任务每隔10执行一次
private void scheduleExpirationRenewal(final long threadId) {
//expirationRenewalMap 是个ConcurrentMap 判断里面是否包含 2b7f630d-bfb8-4687-879a-bfbf3f2ab731:lock_key ,
//如果包含直接终止
if (expirationRenewalMap.containsKey(getEntryName())) {
return;
}
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
//去执行lua脚本进行 锁的续约
RFuture<Boolean> future = renewExpirationAsync(threadId);
future.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
//清空里面的值
expirationRenewalMap.remove(getEntryName());
//异步接受 续约的 结果
if (!future.isSuccess()) {
log.error("Can't update lock " + getName() + " expiration", future.cause());
return;
}
//如果说没有续约成功,继续执行续约定时任务
if (future.getNow()) {
// reschedule itself
scheduleExpirationRenewal(threadId);
}
}
});
}
//internalLockLeaseTime 默认30秒 30000毫秒 30/3=10 定时任务每隔10执行一次
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
if (expirationRenewalMap.putIfAbsent(getEntryName(), new ExpirationEntry(threadId, task)) != null) {
task.cancel();
}
}