背景
在一次对外接口的开发中,采用下图的zk分布式锁方式让请求排队进入,发现存在一定程度的锁失效情况,然后经过分析,发现问题出现在finally模块的主动删除操作中。
private CuratorFramework client = ZKClientUtil.getZKClient();
public T getDistributedLock(String path, int maxMaitMilliSeconds) throws Exception {
InterProcessMutex lock = new InterProcessMutex(client, path);
if (lock.acquire(maxMaitMilliSeconds, TimeUnit.MILLISECONDS)) {
try {
return process();
}
finally {
lock.release();
try {
client.delete().guaranteed().deletingChildrenIfNeeded().forPath(path);
} catch (KeeperException.NoNodeException e) {
// ignore, already deleted
}
}
}
return null;
}
什么情况下锁失效
假如有线程1、线程2、线程3申请获取同一把锁A,线程1执行完release后,线程2进入持有锁,线程1继续执行delete删除锁A,此时线程3进入申请锁A成功,则造成线程2、线程3同时进入执行。
为什么finally主动删除
经过排查,zk分布式锁创建lockPath的持久化节点和lockPath下级的临时顺序节点,其中持久化节点会一直保存,大量请求会导致内存溢出。如图所示但是以上也不是解决问题的好办法,多线程会存在锁失效情况,怎么解决呢?两种方法:
- 基于zk锁节点删除,定时判断lock节点下是否为空,为空就删除
- 基于业务删除,可以基于不同业务不通请求拼装的锁path,对于已完成的请求定时删除
锁原理
首先,通过threadData判断是否是重入锁,说明curator支持可重入,threadData是一个ConcurrentMap,记录当前容器已经获取锁的线程。
然后,attemptLock()就是尝试加锁,后面详解。
最后,将获取到的锁放入threadData。
private boolean internalLock(long time, TimeUnit unit) throws Exception
{
/*
Note on concurrency: a given lockData instance
can be only acted on by a single thread so locking isn't necessary
*/
Thread currentThread = Thread.currentThread();
LockData lockData = threadData.get(currentThread);
if ( lockData != null )
{
// re-entering
lockData.lockCount.incrementAndGet();
return true;
}
String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
if ( lockPath != null )
{
LockData newLockData = new LockData(currentThread, lockPath);
threadData.put(currentThread, newLockData);
return true;
}
return false;
}
上面方法作用就是创建临时顺序节点,path是由lockPath=/lock/1660614248482/拼接LOCK_NAME = "lock-"形成,在forPath()方法执行创建逻辑
由图中可以看到,通过getProtectedPrefix()方法,返回固定值"_ c _"+uuid,形成图中所示的临时节点路径,
try
{
createdPath = client.getZooKeeper().create(path, data, acling.getAclList(path), createMode);
}
catch ( KeeperException.NoNodeException e )
{
if ( createParentsIfNeeded )
{
ZKPaths.mkdirs(client.getZooKeeper(), path, false, client.getAclProvider());
createdPath = client.getZooKeeper().create(path, data, acling.getAclList(path), createMode);
}
else
{
throw e;
}
}
从以上代码可知,第一次调用create()方法,因为路径不存在,会抛出异常,在异常catch逻辑中,通过ZKPaths.mkdirs()循环创建子路径,然后再次调用create()创建。ZKPaths.mkdirs()核心逻辑如下代码所示:
int pos = 1; // skip first slash, root is guaranteed to exist
do
{
pos = path.indexOf('/', pos + 1);
if ( pos == -1 )
{
if ( makeLastNode )
{
pos = path.length();
}
else
{
break;
}
}
String subPath = path.substring(0, pos);
if ( zookeeper.exists(subPath, false) == null )
{
try
{
List<ACL> acl = null;
if ( aclProvider != null )
{
acl = aclProvider.getAclForPath(path);
if ( acl == null )
{
acl = aclProvider.getDefaultAcl();
}
}
if ( acl == null )
{
acl = ZooDefs.Ids.OPEN_ACL_UNSAFE;
}
zookeeper.create(subPath, new byte[0], acl, CreateMode.PERSISTENT);
}
catch ( KeeperException.NodeExistsException e )
{
// ignore... someone else has created it since we checked
}
}
}
while ( pos < path.length() );
当前锁path创建完成之后,还不能表示持有锁,接下来进入internalLockLoop(long startMillis, Long millisToWait, String ourPath)判断是否持有锁。如下代码所示:
while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )
{
List<String> children = getSortedChildren();
String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
if ( predicateResults.getsTheLock() )
{
haveTheLock = true;
}
else
{
String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
synchronized(this)
{
Stat stat = client.checkExists().usingWatcher(watcher).forPath(previousSequencePath);
if ( stat != null )
{
if ( millisToWait != null )
{
millisToWait -= (System.currentTimeMillis() - startMillis);
startMillis = System.currentTimeMillis();
if ( millisToWait <= 0 )
{
doDelete = true; // timed out - delete our node
break;
}
wait(millisToWait);
}
else
{
wait();
}
}
}
// else it may have been deleted (i.e. lock released). Try to acquire again
}
}
}
可以看出来,首先获取basePath的children,即获取所有获取该锁的线程,然后获取当前锁生成临时节点值sequenceNodeName,进入getsTheLock方法,maxLeases是一个固定值1,判断当前index是否小于maxLeases,小于则持有锁,然后获取锁结束,否则获取上个节点previousSequencePath,同时对上个节点注册监听watcher,然后当前线程进入等待,直到超时或被唤醒。
至此,获取锁流程结束。