redisson 分布式锁之可重入锁原理解析

基础理论

  • redisson是使用java实现的操作redis的一个工具,redisson可以作为spring-data的底层实现,通过redisTemplate封装的api来使用,redisson利用redis中的hash数据结构来实现获取锁、锁重入和释放锁等。
  • redisson包含了各种分布式锁的实现其中包括了,可重入锁、公平锁、读写锁、联锁、红锁等。java中常用的锁redisson都有分布式的实现方案。
  • redisson实现的可重入锁原理跟java中的ReentrantLock类似,通过redis的hash数据来获取锁和锁的可重入,redis的发布订阅消息实现了线程阻塞和重试获取锁。

源码和流程

获取锁原理解析

  1. 如下图使用redisson加锁后会在redis中创建一个hash类型的数据,其中redisKey是order,key是线程的线程id+线程获取锁的次数,value是线程获取锁的次数。
锁数据结构.png
  1. redisson获取锁的实现是通过lua脚本来实现的具体实现如下
//判断rediskey是否存在,如果不存在则表示锁没有被其它线程获取
"if (redis.call('exists', KEYS[1]) == 0) then " +
//创建命名为order的hash数据,并且把线程id作为key,1作为value存入hash中
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
//重置redis过期时间
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
//返回nil在java中就是null
"return nil; " +
"end; " +
//到这一步了则表示锁已经被获取了接下来判断获取锁的线程是否是当前线程
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
//如果获取锁成功,代表获取锁次数的value+1
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
//重置redisKey的有效期
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
//到了这一步则表示获取锁已经失败了,最后返回redisKey有效期的剩余时间
"return redis.call('pttl', KEYS[1]);"

3.源码解析

    @Test
    void method1() throws InterruptedException {
        // 尝试获取锁
        boolean isLock = lock.tryLock(1L, TimeUnit.SECONDS);
        if (!isLock) {
            log.error("获取锁失败 .... 1");
            return;
        }
        try {
            log.info("获取锁成功 .... 1");
            method2();
            log.info("开始执行业务 ... 1");
        } finally {
            log.warn("准备释放锁 .... 1");
            lock.unlock();
        }
    }
    
    @Override
    public boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException {
        return tryLock(waitTime, -1, unit);
    }
    
    @Override
    public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
        long time = unit.toMillis(waitTime);
        long current = System.currentTimeMillis();
        long threadId = Thread.currentThread().getId();
        
        //1.执行lua脚本并且根据返回的结果ttl判断获取锁是否成功
        //2.如果获取锁成功并且leaseTime(锁释放时间)为-1则开启看门狗
        //刷新锁的过期时间防止锁过期失效
        Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
        // lock acquired
        if (ttl == null) {
            return true;
        }
        
        //计算剩余的锁等待时间如果过期了直接返回false
        time -= System.currentTimeMillis() - current;
        if (time <= 0) {
            acquireFailed(waitTime, unit, threadId);
            return false;
        }
        
        current = System.currentTimeMillis();
        //订阅锁释放消息,订阅成功后线程会被阻塞在这里等待其它线程释放锁
        //并且发布消息,等待是有时间限制的
        RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
        if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
            //超过锁等待时间订阅的消息还未发布直接取消订阅并且返回false
            if (!subscribeFuture.cancel(false)) {
                subscribeFuture.onComplete((res, e) -> {
                    if (e == null) {
                        unsubscribe(subscribeFuture, threadId);
                    }
                });
            }
            acquireFailed(waitTime, unit, threadId);
            return false;
        }

        try {
            //计算剩余的锁等待时间并且判断等待时间是否<=0如果等待时间
            //用完了直接返回false
            time -= System.currentTimeMillis() - current;
            if (time <= 0) {
                acquireFailed(waitTime, unit, threadId);
                return false;
            }
        
            while (true) {
                long currentTime = System.currentTimeMillis();
                //重新尝试获取锁根据返回的ttl判断锁是否获取成功
                ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
                // lock acquired
                if (ttl == null) {
                    return true;
                }

                //判断剩余的锁等待超时时间是否清零了如果清零了返回false
                time -= System.currentTimeMillis() - currentTime;
                if (time <= 0) {
                    acquireFailed(waitTime, unit, threadId);
                    return false;
                }

                // waiting for message
                currentTime = System.currentTimeMillis();
                // 比较ttl和time谁时间少,时间少的作为第二次订阅消息的
                //等待超时时间
                if (ttl >= 0 && ttl < time) {
                    subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } else {
                    subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
                }

                //判断剩余时间是否超时
                time -= System.currentTimeMillis() - currentTime;
                if (time <= 0) {
                    acquireFailed(waitTime, unit, threadId);
                    return false;
                }
            }
        } finally {
            unsubscribe(subscribeFuture, threadId);
        }
    }
    
    private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
        //阻塞线程等到lua脚本返回的ttl的值
        return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
    }
    
    private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
        //leaseTime锁过期时间 如果!= -1 不需要开启看门狗执行后直接返回即可
        if (leaseTime != -1) {
            return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
        }
        //执行释放锁和发布锁释放消息的lua脚本
        RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime,
                                                commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),
                                                TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
        //等待lua脚本执行结束
        ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
            if (e != null) {
                return;
            }

            //判断如果获取锁成功,设置看门狗
            if (ttlRemaining == null) {
                scheduleExpirationRenewal(threadId);
            }
        });
        return ttlRemainingFuture;
    }
    
    private static final ConcurrentMap<String, ExpirationEntry> EXPIRATION_RENEWAL_MAP = new ConcurrentHashMap<>();
    
    private void scheduleExpirationRenewal(long threadId) {
        ExpirationEntry entry = new ExpirationEntry();
        //ExpirationEntry map存放这所有的看门狗定时任务主要作用是
        //1.循环执行定时任务时判断锁是否被释放,锁释放时会把map中的key删除。
        //2.释放锁时删除map中的看门狗定时任务
        //getEntryName() -> e7b433e7-8f44-46c8-b98a-c59f487b6136:order
        ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
        if (oldEntry != null) {
            oldEntry.addThreadId(threadId);
        } else {
            entry.addThreadId(threadId);
            renewExpiration();
        }
    }
    
    private void renewExpiration() {
        //获取看门狗定时任务判断锁是否被释放
        ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
        if (ee == null) {
            return;
        }
        
        //执行定时任务
        Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {
                ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
                if (ent == null) {
                    return;
                }
                Long threadId = ent.getFirstThreadId();
                if (threadId == null) {
                    return;
                }
                
                //重置redisKey的时间
                RFuture<Boolean> future = renewExpirationAsync(threadId);
                future.onComplete((res, e) -> {
                    if (e != null) {
                        log.error("Can't update lock " + getName() + " expiration", e);
                        return;
                    }
                    
                    //递归调用定时任务,保障锁未释放期间redisKey不会过期
                    if (res) {
                        // reschedule itself
                        renewExpiration();
                    }
                });
            }
        //internalLockLeaseTime / 3 = 10s 默认10s刷一次
        }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
        
        ee.setTimeout(task);
    }

释放锁原理

  1. redisson释放锁的实现也是通过lua脚本来实现的具体实现如下
//检查锁是否是自己的
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
//如果锁不是自己的直接返回nil
"return nil;" +
"end; " +
//释放一次锁之后返回剩余的数量
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
//如果counter>0表示锁还未被完全释放
"if (counter > 0) then " +
//重置锁的有效期
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
//counter == 0 表示锁被完全释放删除redisKey
"redis.call('del', KEYS[1]); " +
//发布消息给还在等待获取锁的线程
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end; " +
"return nil;",
  1. 源码解析
    @Test
    void method1() throws InterruptedException {
        // 尝试获取锁
        lock.lock();
        boolean isLock = lock.tryLock(1L, TimeUnit.SECONDS);
        if (!isLock) {
            log.error("获取锁失败 .... 1");
            return;
        }
        try {
            log.info("获取锁成功 .... 1");
            method2();
            log.info("开始执行业务 ... 1");
        } finally {
            log.warn("准备释放锁 .... 1");
            lock.unlock();
        }
    }
    
    @Override
    public void unlock() {
        try {
            //阻塞等待释放锁的流程执行
            get(unlockAsync(Thread.currentThread().getId()));
        } catch (RedisException e) {
            if (e.getCause() instanceof IllegalMonitorStateException) {
                throw (IllegalMonitorStateException) e.getCause();
            } else {
                throw e;
            }
        }
        
    }
    
    @Override
    public RFuture<Void> unlockAsync(long threadId) {
        RPromise<Void> result = new RedissonPromise<Void>();
        //执行释放锁的lua脚本并且发布锁释放消息
        RFuture<Boolean> future = unlockInnerAsync(threadId);

        //等待lua脚本执行完并且执行下面的代码
        future.onComplete((opStatus, e) -> {
            
            //取消看门狗定时任务
            cancelExpirationRenewal(threadId);

            //出现异常抛出异常
            if (e != null) {
                result.tryFailure(e);
                return;
            }

            //lua脚本执行返回的结果opStatus如果是null释放锁失败
            //抛出业务异常
            if (opStatus == null) {
                IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                        + id + " thread-id: " + threadId);
                result.tryFailure(cause);
                return;
            }
            
            //锁释放成功
            result.trySuccess(null);
        });

        return result;
    }
    
    void cancelExpirationRenewal(Long threadId) {
        //从EXPIRATION_RENEWAL_MAP中取出看门狗定时任务判断定时任务是否为空
        //如果定时任务为空直接停止
        ExpirationEntry task = EXPIRATION_RENEWAL_MAP.get(getEntryName());
        if (task == null) {
            return;
        }
        
        //删除EXPIRATION_RENEWAL_MAP中的定时任务
        if (threadId != null) {
            task.removeThreadId(threadId);
        }
        
        //取消定时任务
        if (threadId == null || task.hasNoThreads()) {
            Timeout timeout = task.getTimeout();
            if (timeout != null) {
                timeout.cancel();
            }
            EXPIRATION_RENEWAL_MAP.remove(getEntryName());
        }
    }
    

获取锁和释放锁流程

如下图左侧的流程图是获取redisson锁的过程右侧的流程图是释放redisson锁的流程图

源码流程-1.png
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容