zk分布式锁之curator实现原理

背景

在一次对外接口的开发中,采用下图的zk分布式锁方式让请求排队进入,发现存在一定程度的锁失效情况,然后经过分析,发现问题出现在finally模块的主动删除操作中。

private CuratorFramework client = ZKClientUtil.getZKClient();

    public T getDistributedLock(String path, int maxMaitMilliSeconds) throws Exception {
        InterProcessMutex lock = new InterProcessMutex(client, path);
        if (lock.acquire(maxMaitMilliSeconds, TimeUnit.MILLISECONDS)) {
            try {
                return process();
            }
            finally {
                lock.release();
                try {
                    client.delete().guaranteed().deletingChildrenIfNeeded().forPath(path);
                } catch (KeeperException.NoNodeException e) {
                    // ignore, already deleted
                }
            }
        }
        return null;
    }

什么情况下锁失效

假如有线程1、线程2、线程3申请获取同一把锁A,线程1执行完release后,线程2进入持有锁,线程1继续执行delete删除锁A,此时线程3进入申请锁A成功,则造成线程2、线程3同时进入执行。

为什么finally主动删除

经过排查,zk分布式锁创建lockPath的持久化节点和lockPath下级的临时顺序节点,其中持久化节点会一直保存,大量请求会导致内存溢出。如图所示
zk锁节点

但是以上也不是解决问题的好办法,多线程会存在锁失效情况,怎么解决呢?两种方法:

  • 基于zk锁节点删除,定时判断lock节点下是否为空,为空就删除
  • 基于业务删除,可以基于不同业务不通请求拼装的锁path,对于已完成的请求定时删除

锁原理

首先,通过threadData判断是否是重入锁,说明curator支持可重入,threadData是一个ConcurrentMap,记录当前容器已经获取锁的线程。
然后,attemptLock()就是尝试加锁,后面详解。
最后,将获取到的锁放入threadData。

private boolean internalLock(long time, TimeUnit unit) throws Exception
    {
        /*
           Note on concurrency: a given lockData instance
           can be only acted on by a single thread so locking isn't necessary
        */

        Thread          currentThread = Thread.currentThread();

        LockData        lockData = threadData.get(currentThread);
        if ( lockData != null )
        {
            // re-entering
            lockData.lockCount.incrementAndGet();
            return true;
        }

        String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
        if ( lockPath != null )
        {
            LockData        newLockData = new LockData(currentThread, lockPath);
            threadData.put(currentThread, newLockData);
            return true;
        }

        return false;
    }
attemptLock方法核心代码

上面方法作用就是创建临时顺序节点,path是由lockPath=/lock/1660614248482/拼接LOCK_NAME = "lock-"形成,在forPath()方法执行创建逻辑


构建path路径

由图中可以看到,通过getProtectedPrefix()方法,返回固定值"_ c _"+uuid,形成图中所示的临时节点路径,

try
{
    createdPath = client.getZooKeeper().create(path, data, acling.getAclList(path), createMode);
}
catch ( KeeperException.NoNodeException e )
{
    if ( createParentsIfNeeded )
    {
        ZKPaths.mkdirs(client.getZooKeeper(), path, false, client.getAclProvider());
        createdPath = client.getZooKeeper().create(path, data, acling.getAclList(path), createMode);
    }
    else
    {
        throw e;
    }
}

从以上代码可知,第一次调用create()方法,因为路径不存在,会抛出异常,在异常catch逻辑中,通过ZKPaths.mkdirs()循环创建子路径,然后再次调用create()创建。ZKPaths.mkdirs()核心逻辑如下代码所示:

int pos = 1; // skip first slash, root is guaranteed to exist
        do
        {
            pos = path.indexOf('/', pos + 1);

            if ( pos == -1 )
            {
                if ( makeLastNode )
                {
                    pos = path.length();
                }
                else
                {
                    break;
                }
            }

            String subPath = path.substring(0, pos);
            if ( zookeeper.exists(subPath, false) == null )
            {
                try
                {
                    List<ACL> acl = null;
                    if ( aclProvider != null )
                    {
                        acl = aclProvider.getAclForPath(path);
                        if ( acl == null )
                        {
                            acl = aclProvider.getDefaultAcl();
                        }
                    }
                    if ( acl == null )
                    {
                        acl = ZooDefs.Ids.OPEN_ACL_UNSAFE;
                    }
                    zookeeper.create(subPath, new byte[0], acl, CreateMode.PERSISTENT);
                }
                catch ( KeeperException.NodeExistsException e )
                {
                    // ignore... someone else has created it since we checked
                }
            }

        }
        while ( pos < path.length() );

当前锁path创建完成之后,还不能表示持有锁,接下来进入internalLockLoop(long startMillis, Long millisToWait, String ourPath)判断是否持有锁。如下代码所示:

while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )
            {
                List<String>        children = getSortedChildren();
                String              sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash

                PredicateResults    predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
                if ( predicateResults.getsTheLock() )
                {
                    haveTheLock = true;
                }
                else
                {
                    String  previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();

                    synchronized(this)
                    {
                        Stat stat = client.checkExists().usingWatcher(watcher).forPath(previousSequencePath);
                        if ( stat != null )
                        {
                            if ( millisToWait != null )
                            {
                                millisToWait -= (System.currentTimeMillis() - startMillis);
                                startMillis = System.currentTimeMillis();
                                if ( millisToWait <= 0 )
                                {
                                    doDelete = true;    // timed out - delete our node
                                    break;
                                }

                                wait(millisToWait);
                            }
                            else
                            {
                                wait();
                            }
                        }
                    }
                    // else it may have been deleted (i.e. lock released). Try to acquire again
                }
            }
        }

可以看出来,首先获取basePath的children,即获取所有获取该锁的线程,然后获取当前锁生成临时节点值sequenceNodeName,进入getsTheLock方法,maxLeases是一个固定值1,判断当前index是否小于maxLeases,小于则持有锁,然后获取锁结束,否则获取上个节点previousSequencePath,同时对上个节点注册监听watcher,然后当前线程进入等待,直到超时或被唤醒。

getsTheLock方法

至此,获取锁流程结束。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 225,208评论 6 524
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 96,502评论 3 405
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 172,496评论 0 370
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 61,176评论 1 302
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 70,185评论 6 401
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 53,630评论 1 316
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 41,992评论 3 431
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 40,973评论 0 280
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 47,510评论 1 325
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 39,546评论 3 347
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 41,659评论 1 355
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 37,250评论 5 351
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 42,990评论 3 340
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 33,421评论 0 25
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 34,569评论 1 277
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 50,238评论 3 382
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 46,732评论 2 366

推荐阅读更多精彩内容