InterProcessSemaphoreV2核心源码解读(分布式信号量的实现)

InterProcessSemaphoreV2是Curator提供的关键工具之一,通过操作InterProcessSemaphoreV2 API可以实现在分布式场景中如同像在JVM中操作Semaphore一样的功能,具有公平,可重入等多个特性。


tips1: 阅读本节,需要简单了解ZK及Curator的简单知识,任意门:Curator简单介绍
tips2: 阅读本节,建议先阅读分布式锁与分布式信号量实现原理
tips3:分布式信号量中,每个许可被称为lease


下面我们开始介阅读源码,看下InterProcessSemaphoreV2如何在分布式系统下实现了信号量。

创意信号量对象

public class ZkSempahore implements SharedCountListener{
    private static String path = "/semahore/test";
    private InterProcessSemaphoreV2 interProcessSemaphoreV2;
    

    //初始化方法,创建客户端
    public void init() throws Exception{
        //此处省略了zkclient的创建必要参数等
        CuratorFramework client = CuratorFrameworkFactory.newClient();
        SharedCount count = new SharedCount(client, path, 5);
        count.addListener(this);
        count.start();
        interProcessSemaphoreV2 = new InterProcessSemaphoreV2(client, path,count);
    }
    //获取lease操作
    public Lease acquire() throws Exception{
        return interProcessSemaphoreV2.acquire();
    }
    
    //释放lease操作
    public void release(Lease lease) {
        interProcessSemaphoreV2.returnLease(lease);
    }

    @Override
    public void countHasChanged(SharedCountReader sharedCount, int newCount) throws Exception {
        //count值发生变化被回调
    }
    @Override
    public void stateChanged(CuratorFramework client, ConnectionState newState) {
        //client状态发生变化被回调
    }
}

acquire方法解析

public Collection<Lease> acquire(int qty, long time, TimeUnit unit) throws Exception
    {
        long startMs = System.currentTimeMillis();
        boolean hasWait = (unit != null);
        long waitMs = hasWait ? TimeUnit.MILLISECONDS.convert(time, unit) : 0;

        Preconditions.checkArgument(qty > 0, "qty cannot be 0");

        ImmutableList.Builder<Lease> builder = ImmutableList.builder();
        boolean success = false;
        try
        {
            while ( qty-- > 0 )
            {
                int retryCount = 0;
                long startMillis = System.currentTimeMillis();
                boolean isDone = false;
                while ( !isDone )
                {
                    // 获取lease关键代码
                    switch ( internalAcquire1Lease(builder, startMs, hasWait, waitMs) )
                    {
                        ....
                    }
                }
            }
            success = true;
        }
        finally
        {
            //如果获取失败,释放所有的节点
            if ( !success )
            {
                returnAll(builder.build());
            }
        }
        return builder.build();
    }

acquire第一次实际意义代码,这段代码internalAcquire1Lease去尝试获取lease,qty=1表示线程只能获取一个lease。
进入internalAcquire1Lease看实现

private InternalAcquireResult internalAcquire1Lease(ImmutableList.Builder<Lease> builder, long startMs, boolean hasWait, long waitMs) throws Exception
    {

        //client状态如果不是启动,返回NULL
        if ( client.getState() != CuratorFrameworkState.STARTED )
        {
            return InternalAcquireResult.RETURN_NULL;
        }

        if ( hasWait )
        {
            //等待超时时间判断
            long thisWaitMs = getThisWaitMs(startMs, waitMs);
            if ( !lock.acquire(thisWaitMs, TimeUnit.MILLISECONDS) )
            {
                return InternalAcquireResult.RETURN_NULL;
            }
        }
        else
        {
            //获取锁的关键方法,如果获取失败则被阻塞
            lock.acquire();
        }
        Lease lease = null;

        try
        {
            PathAndBytesable<String> createBuilder = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL);
            //在指定的lease path下创建node
            String path = (nodeData != null) ? createBuilder.forPath(ZKPaths.makePath(leasesPath, LEASE_BASE_NAME), nodeData) : createBuilder.forPath(ZKPaths.makePath(leasesPath, LEASE_BASE_NAME));
            String nodeName = ZKPaths.getNodeFromPath(path);
            lease = makeLease(path);

            if ( debugAcquireLatch != null )
            {
                debugAcquireLatch.await();
            }
            //同步块
            synchronized(this)
            {
                //死循环,通过return返回或者异常
                for(;;)
                {
                    List<String> children;
                    try
                    {   //获取该lease path下子节点集合
                        children = client.getChildren().usingWatcher(watcher).forPath(leasesPath);
                    }
                    catch ( Exception e )
                    {
                        if ( debugFailedGetChildrenLatch != null )
                        {
                            debugFailedGetChildrenLatch.countDown();
                        }
                        returnLease(lease); // otherwise the just created ZNode will be orphaned causing a dead lock
                        throw e;
                    }
                    //若子节点集合不包含当前节点创建的子节点,异常处理
                    if ( !children.contains(nodeName) )
                    {
                        log.error("Sequential path not found: " + path);
                        returnLease(lease);
                        return InternalAcquireResult.RETRY_DUE_TO_MISSING_NODE;
                    }
                    //若lease path下创建的子节点个数,不超过令牌的最大个数,break出循环
                    if ( children.size() <= maxLeases )
                    {
                        break;
                    }
                    //若存在等待逻辑,执行等待逻辑
                    if ( hasWait )
                    {
                        long thisWaitMs = getThisWaitMs(startMs, waitMs);
                        if ( thisWaitMs <= 0 )
                        {
                            returnLease(lease);
                            return InternalAcquireResult.RETURN_NULL;
                        }
                        wait(thisWaitMs);
                    }
                    else
                    {
                        wait();
                    }
                }
            }
        }
        finally
        {
            //释放当前lock path下创建的lock node
            //此处操作非常重要
            lock.release();
        }
        //返回lease
        builder.add(Preconditions.checkNotNull(lease));
        return InternalAcquireResult.CONTINUE;
    }

此处有几处需要注意

  1. 获取许可的操作被分为两部分
    1.1 首先通过lock.acquire()方法去得到创建lock path 下临时节点的权利,否则会阻塞;
    1.2 成功获取lock后需要通过创建lease path下临时节点,并判断lease path下临时节点个数小于令牌个数实现信号量。
    可以认为第二步是实现分布式信号量的关键
  2. 通过finally中的lock.release() 释放在1.1中被创建的lock path下临时节点。这一部的必要是当抛出异常或者成功获取lease后,需要去释放lock path下的临时节点让其他客户端有解除阻塞,有继续去获取lease的机会

也许有的朋友会问,为什么要将分布式信号量设计为两步,同时维护lock path和lease path。只维护一个lease path难道不可以吗?后面我会解答这个问题。

下面首先进入第一步,去获取lock锁,进入lock.acquire()方法内部,InterProcessSemaphoreV2内部实现了InterProcessMutex,InterProcessMutexq其实是分布式锁与分布式信号量实现原理中分布式锁的实现方案,简单而言,其实现了一个公平的可重入分布式锁。

  private boolean internalLock(long time, TimeUnit unit) throws Exception
    {

        Thread currentThread = Thread.currentThread();

        //可重入代码实现,threadData是一个单例的ConcurrentMap
        LockData lockData = threadData.get(currentThread);
        if ( lockData != null )
        {
            // re-entering
            lockData.lockCount.incrementAndGet();
            return true;
        }
        //如果不存在,则尝试获取锁
        String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
        if ( lockPath != null )
        {   //如果获取成功,则更新threadData,并返回
            LockData newLockData = new LockData(currentThread, lockPath);
            threadData.put(currentThread, newLockData);
            return true;
        }
        return false;
    }
}

上面这段代码实现的主要功能是可重入,进入attemptLock,终于可以接近关键的信息了。

 String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception
    {
        final long      startMillis = System.currentTimeMillis();
        final Long      millisToWait = (unit != null) ? unit.toMillis(time) : null;
        final byte[]    localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
        int             retryCount = 0;

        String          ourPath = null;
        boolean         hasTheLock = false;
        boolean         isDone = false;
        while ( !isDone )
        {
            isDone = true;
            try
            {   
                //首先基于lock path创建一个节点,注意所有的客户端进来时不管是否获取锁,都首先创建节点
                ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
                //判断当前客户端是否获取锁
                hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
            }
            catch ( KeeperException.NoNodeException e )
            {
                if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) )
                {
                    isDone = false;
                }
                else
                {
                    throw e;
                }
            }
        }

        if ( hasTheLock )
        {
            return ourPath;
        }
        return null;
    }

上面这段代码,首先为本次申请创建了一个临时node。

 private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception
    {
        boolean     haveTheLock = false;
        boolean     doDelete = false;
        try
        {
            if ( revocable.get() != null )
            {
                client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
            }

            while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )
            {
                //获取lock path下的所有node name,并根据node name编号降序排列
                List<String>        children = getSortedChildren();
                //获取当前acquire创建的node节点编号
                String              sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
                //判断获取锁
                PredicateResults    predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
                if ( predicateResults.getsTheLock() )
                {
                    //成功获取
                    haveTheLock = true;
                }
                else    
                {   //得到前一个编号的node 路径
                    String  previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
                    synchronized(this)
                    {
                        try 
                        {
                            // 监听前一个node 
                            client.getData().usingWatcher(watcher).forPath(previousSequencePath);
                            if ( millisToWait != null )
                            {
                                millisToWait -= (System.currentTimeMillis() - startMillis);
                                startMillis = System.currentTimeMillis();
                                if ( millisToWait <= 0 )
                                {
                                    doDelete = true;    // timed out - delete our node
                                    break;
                                }

                                wait(millisToWait);
                            }
                            else
                            {
                                wait();
                            }
                        }
                        catch ( KeeperException.NoNodeException e ) 
                        {
                            // it has been deleted (i.e. lock released). Try to acquire again
                        }
                    }
                }
            }
        }
        catch ( Exception e )
        {
            ThreadUtils.checkInterrupted(e);
            doDelete = true;
            throw e;
        }
        finally
        {
            if ( doDelete )
            {   //如果获取失败,删除自己的node 临时节点
                deleteOurPath(ourPath);
            }
        }
        return haveTheLock;
    }

以上这部分是较为核心代码,其设计思路如同分布式锁与分布式信号量实现原理介绍的,首先判断自己是否获取锁,没有的话就监听并wait前一个node。进入getsTheLock方法内部

public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception
   {   
       //自己的node在所有有序子节点集合中的编号位置
       int             ourIndex = children.indexOf(sequenceNodeName);
       validateOurIndex(sequenceNodeName, ourIndex);
       //maxLease恒等于1,判断自己是否处于第0位,也就是子节点中最小的节点,如果是最小节点则获取锁,否则不获取
       boolean         getsTheLock = ourIndex < maxLeases;
       //如果获取锁成功,则为null,否则返回排在自己前面node的path
       String          pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases);
       //包装好返回
       return new PredicateResults(pathToWatch, getsTheLock);
   }

自此分布式信号量InterProcessSemaphoreV2 acquire方法解读完成。
InterProcessSemaphoreV2的release方法较为简单。

 //在创建lease时候注册了回调方法
 private Lease makeLease(final String path)
    {
        return new Lease()
        {
            @Override
            public void close() throws IOException
            {
                try 
                {
                    client.delete().guaranteed().forPath(path);
                }
                catch ( KeeperException.NoNodeException e )
                {
                    log.warn("Lease already released", e);
                }
                catch ( Exception e )
                {
                    ThreadUtils.checkInterrupted(e);
                    throw new IOException(e);
                }
            }

            @Override
            public byte[] getData() throws Exception
            {
                return client.getData().forPath(path);
            }

            @Override
            public String getNodeName() {
                return ZKPaths.getNodeFromPath(path);
            }
        };
    }

以上InterProcessSemaphoreV2核心源码解读基本完成。其设计思路基于了分布式锁与分布式信号量实现原理中的方法。
总体上InterProcessSemaphoreV2基于InterProcessMutex实现了一个非公平信号量。回到提到的问题,为什么需要设置成两个zk path?
个人决定原因主要在于zk node 节点状态变动event一般都是发送给单节点,主要为了避免羊群效应。通过lock path可以实现大家在获取许可时,只需要监听自己前面一个节点。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 222,183评论 6 516
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 94,850评论 3 399
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 168,766评论 0 361
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,854评论 1 299
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,871评论 6 398
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 52,457评论 1 311
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,999评论 3 422
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,914评论 0 277
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 46,465评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,543评论 3 342
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,675评论 1 353
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 36,354评论 5 351
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 42,029评论 3 335
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,514评论 0 25
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,616评论 1 274
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 49,091评论 3 378
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,685评论 2 360