基于Consul互斥锁Lock的实现

背景

最近给apache shardingsphere 贡献了一个基于consul做集群模式的注册中心,已经被亮哥merge到5.2.1的版本,支持对互斥锁,可重入锁,超时锁的实现,以及支持类似zk的临时节点和watch机制,支持新增,删除,修改三种事件,感兴趣的同学可以去到apache shardingsphere的源码,这篇文章主要是总结下,基于consul 实现互斥锁的背后的原理的知识点。

Consul Session 机制

要明白Consul实现锁,我们需要知道consul的两个核心特性,consul的session机制和阻塞查询,先放一段客户端创建session的代码,这样好理解:

private String createSession(final String lockName) {
            NewSession session = new NewSession();
            session.setName(lockName);
            // lock was released by force while session is invalid
            session.setBehavior(Session.Behavior.RELEASE);
          session.setTtl(consulProperties.getValue(ConsulPropertyKey.TIME_TO_LIVE_SECONDS));
            return consulClient.sessionCreate(session, null).getValue();
        }

Session Behavior

consul session 的两个核心特性是behaviorttl, behavior 有两种类型,分别是Release 和 Delete,聪明的同学一看就知道release就是用来实现lock机制的,delete 就是用来实现类型zk的临时节点机制的,当session 的ttl失效时,consul server就会根据behavior来做对应的action,我们这里是实现锁创建的session,所以指定为release。

Session Ttl

ttl 机制是consul用来判断这个session 是否还有效,就类似租约机制一个道理,我们如果要让这个session 有校,就要定期的更新ttl,在consul这里叫renew session。

Session Id

上面的代码我们创建一个session,consul会返回给我们一个session id,这个session id有啥用呢,就是我们创建key value的时候,我们可以带上这个session id,那个这个key value就是和这个session 绑定的,session 过期那这个对应的key value 就会被consul 有策略来处理掉,策略就是上面提供的release和delete两种,对应的代码实现如下:

String sessionId = createSession(lockPath);
//acquire session 代表获取锁
putParams.setAcquireSession(sessionId);
//写key value时带上 session id
Response<Boolean> response = consulClient.setKVValue(lockPath, DEFAULT_CONSUL_LOCK_VALUE, putParams);
if (response.getValue()) {
        // lock success
        lockSessionMap.set(sessionId);
        SESSION_FLUSH_EXECUTOR.scheduleAtFixedRate(() -> consulClient.renewSession(sessionId, QueryParams.DEFAULT), 5L, 10L, TimeUnit.SECONDS);
        return true;
} else {
  //获取锁失败,如果是互斥锁,则需要继续wait,等待锁释放,下面会讲到
}

绑定key value的session consul会保证只有一个session id会操作成功,其他的都会失败,这个是实现互斥锁的关键,你可以理解每个线程获取锁时,都会对应一个session id,锁就是一个key,获取锁成功后,需要定时给session 续约,也就是定期调用renew session,除非主动释放锁,否则如果进程挂了,这个锁就不能释放,会导致一直等待,类似死锁了。

另外获取锁成功后,需要把session id 放到线程上下文中,作用是用来释放锁的时候,需要用的这个session id的,下面释放锁会用到。

释放锁

释放锁的实现相对简单,就是set key 操作时,指定release 参数的值为获取锁时候的session id,代码如下:

public void unlock() {
            try {
                PutParams putParams = new PutParams();
                //从线程上下文获取session id,我们前面获取锁成功时,会放进去。
                String sessionId = lockSessionMap.get();
                putParams.setReleaseSession(sessionId);
                String lockPath = CONSUL_ROOT_PATH + CONSUL_PATH_SEPARATOR + lockName;
                consulClient.setKVValue(lockPath, DEFAULT_CONSUL_UNLOCK_VALUE, putParams);
                //消费对应的session
                consulClient.sessionDestroy(sessionId, null);
                // CHECKSTYLE:OFF
            } catch (final Exception ex) {
                // CHECKSTYLE:ON
                log.error("EtcdRepository unlock error, lockName: {}", lockName, ex);
            } finally {
                lockSessionMap.remove();
            }
        }

有的同学看到这里就有疑问了,这里只是简单的释放锁,并没看到通知那些在等待锁释放的线程,这里其实时通过consul 来通知的,consul 有个阻塞查询机制,如果对应的key value有发生变更,则会通知阻塞在这个key value上的查询客户端,释放锁也是一个变更操作,consul 对变更的这个key 的版本会变更,客户端
可以通过这个版本判断是否真的发生了变更。

等待锁

等待锁分两种,一种是一直等待,值得有人释放锁,这种一般是分布式锁,选举的场景,我们平时一般用的都是try lock,带有时间参数的,不可能一直等,所以需要指定超时时间,如果指定的时间内获取不到锁,就超时退出,不再等待。

Consul 阻塞查询

通过consul实现锁等待,是通过consul的阻塞查询机制实现的,我们通过consul做动态的实时变更也是通过这个方式,我们指定一个阻塞的时间,比如30s,如果30s内没有发生变更,consul则会阻塞,当前的http请求,不响应,如果30s内有变更,则响应,如果还没有变更则返回当前的key的value数据,需要客户端自己判断是超时返回了,还是真正的发生了变更,基于篇幅的原因,这里只讲有时间范围的锁等待,下面放上关键的代码:

private long doWaitRelease(final String key, final long valueIndex, final long waitTime) {
            long currentIndex = valueIndex;
            if (currentIndex < 0) {
                currentIndex = 0;
            }
            AtomicBoolean running = new AtomicBoolean(true);
            long waitCostTime = 0L;
            long now = System.currentTimeMillis();
            long deadlineWaitTime = now + waitTime;
            long blockWaitTime = waitTime;
            while (running.get()) {
                //每次重新获取锁时,需要检查等待时间是否已经达到,如果已经达到则退出,不再获取锁。
                long startWaitTime = System.currentTimeMillis();
                if (startWaitTime >= deadlineWaitTime) {
                    // wait time is reached max
                    return waitTime;
                }
                //阻塞查询,有人释放了锁,则这里会返回,当然如果在超时时间到达后,还没有对应的锁释放,则超时返回。
                RawResponse rawResponse = ((ShardingSphereConsulClient) consulClient).getRawClient().makeGetRequest("/v1/kv/" + key, null, new ShardingSphereQueryParams(blockWaitTime, currentIndex));
                Response<GetValue> response = warpRawResponse(rawResponse);
                Long index = response.getConsulIndex();
                //计算本次阻塞的时间总和。
                waitCostTime += System.currentTimeMillis() - startWaitTime;
                blockWaitTime -= waitCostTime;
                if (null != index && index >= currentIndex) {
                    if (currentIndex == 0) {
                        //这里是consul 有个bug,第一次会立即返回。
                        currentIndex = index;
                        continue;
                    }
                    currentIndex = index;
                    GetValue getValue = response.getValue();
                    if (null == getValue || null == getValue.getValue()) {
                        return waitCostTime;
                    }
                    if (!key.equals(getValue.getKey())) {
                        continue;
                    }
                    return waitCostTime;
                }
                if (null != index) {
                    currentIndex = 0;
                }
            }
            return -1;
        }

等待锁释放的关键是阻塞试查询,这里我重写了一个客户端,默认的开源阻塞时间只支持秒为单位,consul 服务端是支持毫秒级别的,我们平时也基本都是毫秒的单位,秒级别太大,没有人会用,所以需要重写,阻塞的时间blockWaitTime就是try lock指定的超时时间。

上面说过,这个阻塞查询get request操作,返回结果时有可能真的有人释放锁,也可能是超时返回了,因为这里阻塞的时间就是超时时间,所以这里判断的index 是大于等于,因为超时返回时等于时间已经到,即waitCostTime 就是blockWaitTime,则不再去获取锁,直接返回,如果这中间锁被释放,外面的代码会尝试去重新获取锁,如果没有获取到,则继续这个wait的逻辑,如果还没有获取到,则需要重新计算阻塞查询的时间,不能还是用之前的那个时间,否则会一直等待的问题,下面放上获取锁的完整代码:

                long lockTime = timeoutMillis;
                PutParams putParams = new PutParams();
                String lockPath = CONSUL_ROOT_PATH + CONSUL_PATH_SEPARATOR + lockName;
                while (true) {
                    //获取锁
                    String sessionId = createSession(lockPath);
                    //acquire session 绑定sessionid
                    putParams.setAcquireSession(sessionId);
                    Response<Boolean> response = consulClient.setKVValue(lockPath, DEFAULT_CONSUL_LOCK_VALUE, putParams);
                    if (response.getValue()) {
                        // lock success
                        lockSessionMap.set(sessionId);
                        SESSION_FLUSH_EXECUTOR.scheduleAtFixedRate(() -> consulClient.renewSession(sessionId, QueryParams.DEFAULT), 5L, 10L, TimeUnit.SECONDS);
                        return true;
                    }
                    // lock failed,exist race so retry
                    // block query if value is change so return
                    consulClient.sessionDestroy(sessionId, null);
                    long waitTime = doWaitRelease(lockPath, response.getConsulIndex(), lockTime);
                    if (waitTime < lockTime) {
                        //这里需要重新计算wait的时间,考虑多人或者多线程获取锁的情况
                        lockTime = lockTime - waitTime;
                        continue;
                    }
                    return false;
                }

上面总体说明了基于consul 实现互斥锁的原理,同样也能做为分布式锁来用,主要是依赖consul的session 机制,阻塞查询,版本等机制来实现,非超时锁,即一直等待,直到获取到锁的代码实现没有分析,如果你感兴趣,可以参考apache shardingsphere 5.2.1版本的集群模式consul repository模块的完整代码。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,616评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,020评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,078评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,040评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,154评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,265评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,298评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,072评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,491评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,795评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,970评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,654评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,272评论 3 318
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,985评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,223评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,815评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,852评论 2 351

推荐阅读更多精彩内容