RedissonLock加锁逻辑

加锁逻辑.png

RedissonLock不同的加锁方法,流程会有所差别:
tryLock()不带参数最终调用的是

    private RFuture<Boolean> tryAcquireOnceAsync(long leaseTime, TimeUnit unit, long threadId) {
        if (leaseTime != -1) {
            return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
        }
        "只有leaseTime为-1时才会,有延期锁的作用(也就是看门狗),锁的过期时间需要看配置,默认是30S"
        RFuture<Boolean> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
        ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
            if (e != null) {
                return;
            }

            // lock acquired
           "获取锁成功后,调用定时任务延迟锁时间"
            if (ttlRemaining) {
                scheduleExpirationRenewal(threadId);
            }
        });
        return ttlRemainingFuture;
    }

传过来的参数leaseTime为-1,unint是null,这个方法获取不到直接就结束了,有点下failfast的模式,线程不会做自旋重试。
具体加锁实现:
tryLockInnerAsync:

    <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        internalLockLeaseTime = unit.toMillis(leaseTime);

        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                  "if (redis.call('exists', KEYS[1]) == 0) then " +
                      "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                      "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  "return redis.call('pttl', KEYS[1]);",
                    Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
    }

通过这种eval表达式,lua脚本保证原子性
如果不存在锁:
等价于:

命令 备注
EXISTS lockkey 判断锁是否存在
HSET lockkey uuid:threadId 1 设置hash field和value值
PEXPIRE lockkey internalLockLeaseTime 设置lockkey的过期时间

如果存在锁:
等价于:

命令 备注
HEXISTS lockkey uuid:threadId 判断当前线程是否已经获取到锁
HSET lockkey uuid:threadId 1 设置hash field和value值
HINCRBY lockkey uuid:threadId 1 给对应的field的值加1(相当于可重入)
PEXPIRE lockkey internalLockLeaseTime 重置过期时间

普通的加锁逻辑就结束了。
tryLock带参数就相对复杂一些,加入了线程自旋相关的逻辑处理:

public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
        long time = unit.toMillis(waitTime);
        long current = System.currentTimeMillis();
        long threadId = Thread.currentThread().getId();
       "走tryAcquireAsync的逻辑"
        Long ttl = tryAcquire(leaseTime, unit, threadId);
        // lock acquired
        if (ttl == null) {
            return true;
        }
        
        time -= System.currentTimeMillis() - current;
        if (time <= 0) {
            "如果已超时,则直接失败"
            acquireFailed(threadId);
            return false;
        }
        
        current = System.currentTimeMillis();
        RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
        if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {
            if (!subscribeFuture.cancel(false)) {
                subscribeFuture.onComplete((res, e) -> {
                    if (e == null) {
                        unsubscribe(subscribeFuture, threadId);
                    }
                });
            }
            acquireFailed(threadId);
            return false;
        }

        try {
            time -= System.currentTimeMillis() - current;
            if (time <= 0) {
                acquireFailed(threadId);
                return false;
            }
        
            while (true) {
                long currentTime = System.currentTimeMillis();
                ttl = tryAcquire(leaseTime, unit, threadId);
                // lock acquired
                if (ttl == null) {
                    return true;
                }

                time -= System.currentTimeMillis() - currentTime;
                if (time <= 0) {
                    acquireFailed(threadId);
                    return false;
                }

                // waiting for message
                currentTime = System.currentTimeMillis();
                if (ttl >= 0 && ttl < time) {
                    getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } else {
                    getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
                }

                time -= System.currentTimeMillis() - currentTime;
                if (time <= 0) {
                    acquireFailed(threadId);
                    return false;
                }
            }
        } finally {
            unsubscribe(subscribeFuture, threadId);
        }
//        return get(tryLockAsync(waitTime, leaseTime, unit));
    }

如果客户端宕机,就不会在续期,那锁到了30s之后自然就失效了

锁续期逻辑:

    private void renewExpiration() {
        ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
        if (ee == null) {
            return;
        }
        
        Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {
                ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
                if (ent == null) {
                    return;
                }
                Long threadId = ent.getFirstThreadId();
                if (threadId == null) {
                    return;
                }
                
                RFuture<Boolean> future = renewExpirationAsync(threadId);
                future.onComplete((res, e) -> {
                    if (e != null) {
                        log.error("Can't update lock " + getName() + " expiration", e);
                        return;
                    }
                    
                    // reschedule itself
                    renewExpiration();
                });
            }
        }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
        
        ee.setTimeout(task);
    }

特别注意:以上过程存在一个细节,这里有必要说明一下,也是分布式锁的一个关键点:当锁正在被占用时,\color{red}{等待获取锁的进程并不是通过一个 while(true) 死循环去获取锁,而是利用了 Redis 的发布订阅机制,通过 await 方法阻塞等待锁的进程},有效的解决了无效的锁申请浪费资源的问题。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • 我们不生产代码,我们是代码的搬运工 前不久,阿里大牛虾总再次抛出了分布式锁的讨论,对照之前项目中实现的redis分...
    码农戏码阅读 3,757评论 0 4
  • 前言 单机环境下我们可以通过JAVA的Synchronized和Lock来实现进程内部的锁,但是随着分布式应用和集...
    Java黎先生阅读 4,104评论 0 1
  • 转自:http://tech.lede.com/2017/03/08/rd/server/Redisson/ 针对...
    王帅199207阅读 5,478评论 0 4
  • 分布式锁 什么是锁?使用锁的目的是为了控制程序的执行顺序,防止共享资源被多个线程同时访问。为了实现多个线程在一个时...
    缘起缘散_f1a7阅读 1,800评论 0 0
  • 我是黑夜里大雨纷飞的人啊 1 “又到一年六月,有人笑有人哭,有人欢乐有人忧愁,有人惊喜有人失落,有的觉得收获满满有...
    陌忘宇阅读 12,732评论 28 53