zk分布式锁之curator实现原理

背景

在一次对外接口的开发中,采用下图的zk分布式锁方式让请求排队进入,发现存在一定程度的锁失效情况,然后经过分析,发现问题出现在finally模块的主动删除操作中。

private CuratorFramework client = ZKClientUtil.getZKClient();

    public T getDistributedLock(String path, int maxMaitMilliSeconds) throws Exception {
        InterProcessMutex lock = new InterProcessMutex(client, path);
        if (lock.acquire(maxMaitMilliSeconds, TimeUnit.MILLISECONDS)) {
            try {
                return process();
            }
            finally {
                lock.release();
                try {
                    client.delete().guaranteed().deletingChildrenIfNeeded().forPath(path);
                } catch (KeeperException.NoNodeException e) {
                    // ignore, already deleted
                }
            }
        }
        return null;
    }

什么情况下锁失效

假如有线程1、线程2、线程3申请获取同一把锁A,线程1执行完release后,线程2进入持有锁,线程1继续执行delete删除锁A,此时线程3进入申请锁A成功,则造成线程2、线程3同时进入执行。

为什么finally主动删除

经过排查,zk分布式锁创建lockPath的持久化节点和lockPath下级的临时顺序节点,其中持久化节点会一直保存,大量请求会导致内存溢出。如图所示
zk锁节点

但是以上也不是解决问题的好办法,多线程会存在锁失效情况,怎么解决呢?两种方法:

  • 基于zk锁节点删除,定时判断lock节点下是否为空,为空就删除
  • 基于业务删除,可以基于不同业务不通请求拼装的锁path,对于已完成的请求定时删除

锁原理

首先,通过threadData判断是否是重入锁,说明curator支持可重入,threadData是一个ConcurrentMap,记录当前容器已经获取锁的线程。
然后,attemptLock()就是尝试加锁,后面详解。
最后,将获取到的锁放入threadData。

private boolean internalLock(long time, TimeUnit unit) throws Exception
    {
        /*
           Note on concurrency: a given lockData instance
           can be only acted on by a single thread so locking isn't necessary
        */

        Thread          currentThread = Thread.currentThread();

        LockData        lockData = threadData.get(currentThread);
        if ( lockData != null )
        {
            // re-entering
            lockData.lockCount.incrementAndGet();
            return true;
        }

        String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
        if ( lockPath != null )
        {
            LockData        newLockData = new LockData(currentThread, lockPath);
            threadData.put(currentThread, newLockData);
            return true;
        }

        return false;
    }
attemptLock方法核心代码

上面方法作用就是创建临时顺序节点,path是由lockPath=/lock/1660614248482/拼接LOCK_NAME = "lock-"形成,在forPath()方法执行创建逻辑


构建path路径

由图中可以看到,通过getProtectedPrefix()方法,返回固定值"_ c _"+uuid,形成图中所示的临时节点路径,

try
{
    createdPath = client.getZooKeeper().create(path, data, acling.getAclList(path), createMode);
}
catch ( KeeperException.NoNodeException e )
{
    if ( createParentsIfNeeded )
    {
        ZKPaths.mkdirs(client.getZooKeeper(), path, false, client.getAclProvider());
        createdPath = client.getZooKeeper().create(path, data, acling.getAclList(path), createMode);
    }
    else
    {
        throw e;
    }
}

从以上代码可知,第一次调用create()方法,因为路径不存在,会抛出异常,在异常catch逻辑中,通过ZKPaths.mkdirs()循环创建子路径,然后再次调用create()创建。ZKPaths.mkdirs()核心逻辑如下代码所示:

int pos = 1; // skip first slash, root is guaranteed to exist
        do
        {
            pos = path.indexOf('/', pos + 1);

            if ( pos == -1 )
            {
                if ( makeLastNode )
                {
                    pos = path.length();
                }
                else
                {
                    break;
                }
            }

            String subPath = path.substring(0, pos);
            if ( zookeeper.exists(subPath, false) == null )
            {
                try
                {
                    List<ACL> acl = null;
                    if ( aclProvider != null )
                    {
                        acl = aclProvider.getAclForPath(path);
                        if ( acl == null )
                        {
                            acl = aclProvider.getDefaultAcl();
                        }
                    }
                    if ( acl == null )
                    {
                        acl = ZooDefs.Ids.OPEN_ACL_UNSAFE;
                    }
                    zookeeper.create(subPath, new byte[0], acl, CreateMode.PERSISTENT);
                }
                catch ( KeeperException.NodeExistsException e )
                {
                    // ignore... someone else has created it since we checked
                }
            }

        }
        while ( pos < path.length() );

当前锁path创建完成之后,还不能表示持有锁,接下来进入internalLockLoop(long startMillis, Long millisToWait, String ourPath)判断是否持有锁。如下代码所示:

while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )
            {
                List<String>        children = getSortedChildren();
                String              sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash

                PredicateResults    predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
                if ( predicateResults.getsTheLock() )
                {
                    haveTheLock = true;
                }
                else
                {
                    String  previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();

                    synchronized(this)
                    {
                        Stat stat = client.checkExists().usingWatcher(watcher).forPath(previousSequencePath);
                        if ( stat != null )
                        {
                            if ( millisToWait != null )
                            {
                                millisToWait -= (System.currentTimeMillis() - startMillis);
                                startMillis = System.currentTimeMillis();
                                if ( millisToWait <= 0 )
                                {
                                    doDelete = true;    // timed out - delete our node
                                    break;
                                }

                                wait(millisToWait);
                            }
                            else
                            {
                                wait();
                            }
                        }
                    }
                    // else it may have been deleted (i.e. lock released). Try to acquire again
                }
            }
        }

可以看出来,首先获取basePath的children,即获取所有获取该锁的线程,然后获取当前锁生成临时节点值sequenceNodeName,进入getsTheLock方法,maxLeases是一个固定值1,判断当前index是否小于maxLeases,小于则持有锁,然后获取锁结束,否则获取上个节点previousSequencePath,同时对上个节点注册监听watcher,然后当前线程进入等待,直到超时或被唤醒。

getsTheLock方法

至此,获取锁流程结束。

©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容