背景
最近给apache shardingsphere 贡献了一个基于consul做集群模式的注册中心,已经被亮哥merge到5.2.1的版本,支持对互斥锁,可重入锁,超时锁的实现,以及支持类似zk的临时节点和watch机制,支持新增,删除,修改三种事件,感兴趣的同学可以去到apache shardingsphere的源码,这篇文章主要是总结下,基于consul 实现互斥锁的背后的原理的知识点。
Consul Session 机制
要明白Consul实现锁,我们需要知道consul的两个核心特性,consul的session机制和阻塞查询,先放一段客户端创建session的代码,这样好理解:
private String createSession(final String lockName) {
NewSession session = new NewSession();
session.setName(lockName);
// lock was released by force while session is invalid
session.setBehavior(Session.Behavior.RELEASE);
session.setTtl(consulProperties.getValue(ConsulPropertyKey.TIME_TO_LIVE_SECONDS));
return consulClient.sessionCreate(session, null).getValue();
}
Session Behavior
consul session 的两个核心特性是behavior
和 ttl
, behavior 有两种类型,分别是Release 和 Delete,聪明的同学一看就知道release就是用来实现lock机制的,delete 就是用来实现类型zk的临时节点机制的,当session 的ttl失效时,consul server就会根据behavior来做对应的action,我们这里是实现锁创建的session,所以指定为release。
Session Ttl
ttl 机制是consul用来判断这个session 是否还有效,就类似租约机制一个道理,我们如果要让这个session 有校,就要定期的更新ttl,在consul这里叫renew session。
Session Id
上面的代码我们创建一个session,consul会返回给我们一个session id,这个session id有啥用呢,就是我们创建key value的时候,我们可以带上这个session id,那个这个key value就是和这个session 绑定的,session 过期那这个对应的key value 就会被consul 有策略来处理掉,策略就是上面提供的release和delete两种,对应的代码实现如下:
String sessionId = createSession(lockPath);
//acquire session 代表获取锁
putParams.setAcquireSession(sessionId);
//写key value时带上 session id
Response<Boolean> response = consulClient.setKVValue(lockPath, DEFAULT_CONSUL_LOCK_VALUE, putParams);
if (response.getValue()) {
// lock success
lockSessionMap.set(sessionId);
SESSION_FLUSH_EXECUTOR.scheduleAtFixedRate(() -> consulClient.renewSession(sessionId, QueryParams.DEFAULT), 5L, 10L, TimeUnit.SECONDS);
return true;
} else {
//获取锁失败,如果是互斥锁,则需要继续wait,等待锁释放,下面会讲到
}
绑定key value的session consul会保证只有一个session id会操作成功,其他的都会失败,这个是实现互斥锁的关键,你可以理解每个线程获取锁时,都会对应一个session id,锁就是一个key,获取锁成功后,需要定时给session 续约,也就是定期调用renew session,除非主动释放锁,否则如果进程挂了,这个锁就不能释放,会导致一直等待,类似死锁了。
另外获取锁成功后,需要把session id 放到线程上下文中,作用是用来释放锁的时候,需要用的这个session id的,下面释放锁会用到。
释放锁
释放锁的实现相对简单,就是set key 操作时,指定release 参数的值为获取锁时候的session id,代码如下:
public void unlock() {
try {
PutParams putParams = new PutParams();
//从线程上下文获取session id,我们前面获取锁成功时,会放进去。
String sessionId = lockSessionMap.get();
putParams.setReleaseSession(sessionId);
String lockPath = CONSUL_ROOT_PATH + CONSUL_PATH_SEPARATOR + lockName;
consulClient.setKVValue(lockPath, DEFAULT_CONSUL_UNLOCK_VALUE, putParams);
//消费对应的session
consulClient.sessionDestroy(sessionId, null);
// CHECKSTYLE:OFF
} catch (final Exception ex) {
// CHECKSTYLE:ON
log.error("EtcdRepository unlock error, lockName: {}", lockName, ex);
} finally {
lockSessionMap.remove();
}
}
有的同学看到这里就有疑问了,这里只是简单的释放锁,并没看到通知那些在等待锁释放的线程,这里其实时通过consul 来通知的,consul 有个阻塞查询机制,如果对应的key value有发生变更,则会通知阻塞在这个key value上的查询客户端,释放锁也是一个变更操作,consul 对变更的这个key 的版本会变更,客户端
可以通过这个版本判断是否真的发生了变更。
等待锁
等待锁分两种,一种是一直等待,值得有人释放锁,这种一般是分布式锁,选举的场景,我们平时一般用的都是try lock,带有时间参数的,不可能一直等,所以需要指定超时时间,如果指定的时间内获取不到锁,就超时退出,不再等待。
Consul 阻塞查询
通过consul实现锁等待,是通过consul的阻塞查询机制实现的,我们通过consul做动态的实时变更也是通过这个方式,我们指定一个阻塞的时间,比如30s,如果30s内没有发生变更,consul则会阻塞,当前的http请求,不响应,如果30s内有变更,则响应,如果还没有变更则返回当前的key的value数据,需要客户端自己判断是超时返回了,还是真正的发生了变更,基于篇幅的原因,这里只讲有时间范围的锁等待,下面放上关键的代码:
private long doWaitRelease(final String key, final long valueIndex, final long waitTime) {
long currentIndex = valueIndex;
if (currentIndex < 0) {
currentIndex = 0;
}
AtomicBoolean running = new AtomicBoolean(true);
long waitCostTime = 0L;
long now = System.currentTimeMillis();
long deadlineWaitTime = now + waitTime;
long blockWaitTime = waitTime;
while (running.get()) {
//每次重新获取锁时,需要检查等待时间是否已经达到,如果已经达到则退出,不再获取锁。
long startWaitTime = System.currentTimeMillis();
if (startWaitTime >= deadlineWaitTime) {
// wait time is reached max
return waitTime;
}
//阻塞查询,有人释放了锁,则这里会返回,当然如果在超时时间到达后,还没有对应的锁释放,则超时返回。
RawResponse rawResponse = ((ShardingSphereConsulClient) consulClient).getRawClient().makeGetRequest("/v1/kv/" + key, null, new ShardingSphereQueryParams(blockWaitTime, currentIndex));
Response<GetValue> response = warpRawResponse(rawResponse);
Long index = response.getConsulIndex();
//计算本次阻塞的时间总和。
waitCostTime += System.currentTimeMillis() - startWaitTime;
blockWaitTime -= waitCostTime;
if (null != index && index >= currentIndex) {
if (currentIndex == 0) {
//这里是consul 有个bug,第一次会立即返回。
currentIndex = index;
continue;
}
currentIndex = index;
GetValue getValue = response.getValue();
if (null == getValue || null == getValue.getValue()) {
return waitCostTime;
}
if (!key.equals(getValue.getKey())) {
continue;
}
return waitCostTime;
}
if (null != index) {
currentIndex = 0;
}
}
return -1;
}
等待锁释放的关键是阻塞试查询,这里我重写了一个客户端,默认的开源阻塞时间只支持秒为单位,consul 服务端是支持毫秒级别的,我们平时也基本都是毫秒的单位,秒级别太大,没有人会用,所以需要重写,阻塞的时间blockWaitTime
就是try lock指定的超时时间。
上面说过,这个阻塞查询get request操作,返回结果时有可能真的有人释放锁,也可能是超时返回了,因为这里阻塞的时间就是超时时间,所以这里判断的index 是大于等于,因为超时返回时等于时间已经到,即waitCostTime 就是blockWaitTime
,则不再去获取锁,直接返回,如果这中间锁被释放,外面的代码会尝试去重新获取锁,如果没有获取到,则继续这个wait的逻辑,如果还没有获取到,则需要重新计算阻塞查询的时间,不能还是用之前的那个时间,否则会一直等待的问题,下面放上获取锁的完整代码:
long lockTime = timeoutMillis;
PutParams putParams = new PutParams();
String lockPath = CONSUL_ROOT_PATH + CONSUL_PATH_SEPARATOR + lockName;
while (true) {
//获取锁
String sessionId = createSession(lockPath);
//acquire session 绑定sessionid
putParams.setAcquireSession(sessionId);
Response<Boolean> response = consulClient.setKVValue(lockPath, DEFAULT_CONSUL_LOCK_VALUE, putParams);
if (response.getValue()) {
// lock success
lockSessionMap.set(sessionId);
SESSION_FLUSH_EXECUTOR.scheduleAtFixedRate(() -> consulClient.renewSession(sessionId, QueryParams.DEFAULT), 5L, 10L, TimeUnit.SECONDS);
return true;
}
// lock failed,exist race so retry
// block query if value is change so return
consulClient.sessionDestroy(sessionId, null);
long waitTime = doWaitRelease(lockPath, response.getConsulIndex(), lockTime);
if (waitTime < lockTime) {
//这里需要重新计算wait的时间,考虑多人或者多线程获取锁的情况
lockTime = lockTime - waitTime;
continue;
}
return false;
}
上面总体说明了基于consul 实现互斥锁的原理,同样也能做为分布式锁来用,主要是依赖consul的session 机制,阻塞查询,版本等机制来实现,非超时锁,即一直等待,直到获取到锁的代码实现没有分析,如果你感兴趣,可以参考apache shardingsphere 5.2.1版本的集群模式consul repository模块的完整代码。