curator 介绍 party1

在如今服务器的开发与部署时,往往考虑的不是单机服务的承载力了,而是更高一阶,如何设计出高可用,高负载,高容量的服务架构。并且业务开发不是简单的将所有业务糅合在单台服务上,而是分模块,分功能分而治理,才有了微服务架构。然后微服务设计中的每一个模块都是可以设计为一个集群,例如常用的,用户模块,权限模块,商品模块,订单模块,支付模块,供应链等等。服务器的设计越来越复杂,对于开发人员的技术能力提出更高要求。
先如今微服务,分布式,集群的架构思想中,zookeeper成为了大多首选并且非常重要的中间件,例如我们熟悉的Hadoop,dubbo中,都出现了zookeeper 。zookeeper有很多特性,例如有注册订阅,分布式锁,队列等等功能。但是zookeeper中提供的java api 并不是很友好,使用起来容易踩坑。例如创建path时,需要判断path的parent是否存在,必须先创建parent path才能创建子路径。还有在添加watcher事件时,一旦该事件触发一次后,如果没有主动将事件重新设置,他不会收到第二次。还有其他一些不太友好的api开发就不在赘述。
所以才引入了curator工具,他实际是更高级的api,使用起来更加方便。但内部核心也是使用zookeeper提供的api,只是在开发中不那么繁琐而已。
举个创建path例子:

  public void createPath() {
        String host = "127.0.0.1:2181";
        String path = root + "/my_path";
        CuratorFramework curator = CuratorClient.create(host);
        try {
            String last = curator.create().creatingParentsIfNeeded().forPath(path, "123".getBytes());
            logger.info("创建路径完成 " + last);
        } catch (Exception e) {
            e.printStackTrace();
            logger.info("创建路径失败 异常类型:" + e.getClass().getName() + ", message:" + e.getMessage());
        }
    }

创建路径,是不是很简单,不需要关心root是否已经创建。curator自己会去做验证判断是否需要创建root路径。然而,我们再升华一下,既然zookeeper提供了很多特性,那么curator是否也能足够支撑呢?在curator组件中,recipes模块中,可以了解到很多有意思的地方:


image.png

可以看到他提供了很多功能,例如原子计算,栅栏,缓存,选举,锁,队列等,提供了很丰富的功能。 那么我们来分析一下 curator如何利用zookeeper的特性,实现这些功能的。

首先需要了解一些基本尝试,例如zookeeper中Watcher有哪些事件

public enum EventType {
            None (-1),
            NodeCreated (1),
            NodeDeleted (2),
            NodeDataChanged (3),
            NodeChildrenChanged (4);
}

包含了节点创建,节点删除,节点数据变革和子节点变更,这些是zookeeper自己的watcher事件类型。
那么curator组件还会提出哪些自己的事件呢?

public enum CuratorEventType
{
    /**
     * Corresponds to {@link CuratorFramework#create()}
     */
    CREATE,

    /**
     * Corresponds to {@link CuratorFramework#delete()}
     */
    DELETE,

    /**
     * Corresponds to {@link CuratorFramework#checkExists()}
     */
    EXISTS,

    /**
     * Corresponds to {@link CuratorFramework#getData()}
     */
    GET_DATA,

    /**
     * Corresponds to {@link CuratorFramework#setData()}
     */
    SET_DATA,

    /**
     * Corresponds to {@link CuratorFramework#getChildren()}
     */
    CHILDREN,

  //....后面还有很多 事件
}

这些事件与zookeeper没有直接关系,而是curator通过调用相应api后,会触发相应的事件,例如调用create()方法,会触发CREATE事件。如果调用checkExists方法,会触发EXISTES事件。

cache包

该包内主要熟悉NodeCache和PathChildrenCache

• NodeCache,是指可以从本地cache中得到节点数据,并且该node可以增加watcher事件,例如节点的 更新/创建/删除。然后重新拉取数据,然后通过本地注册的listeners,他们会得到变更通知。

    private final CuratorFramework client;
    private final String path;
    private final boolean dataIsCompressed;
    private final AtomicReference<ChildData> data = new AtomicReference<ChildData>(null);
    private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
    private final ListenerContainer<NodeCacheListener> listeners = new ListenerContainer<NodeCacheListener>();
    private final AtomicBoolean isConnected = new AtomicBoolean(true);
    private ConnectionStateListener connectionStateListener = new ConnectionStateListener();
    private Watcher watcher = new Watcher();

这些是NodeCache的基本属性,
listeners是存储了节点缓存变更的监听器。
data是当前节点的存储的数据,从zookeeper节点上缓存在本地的数据
connectionStateListener是连接状态变更监听器,例如重连,掉线等事件
watcher就是与zookeeper中的一样,针对路径进行监听。

    public NodeCache(CuratorFramework client, String path, boolean dataIsCompressed)
    {
        this.client = client;
        this.path = PathUtils.validatePath(path);
        this.dataIsCompressed = dataIsCompressed;
    }

普通的构造器,最重要的是一个path路径和client,当声明对象后,就要启动该NodeCache。

    public void     start(boolean buildInitial) throws Exception
    {
        Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
        client.getConnectionStateListenable().addListener(connectionStateListener);

        if ( buildInitial )
        {
            client.checkExists().creatingParentContainersIfNeeded().forPath(path);
            internalRebuild();
        }
        reset();
    }

首先会将connectionStateListener状态监听器添加到client状态监听列表中。如果buildInitial=true,需要初始化,那么尝试创建parent,然后获取zk上的节点数据。最终执行reset方法。


    private void     reset() throws Exception
    {
        if ( (state.get() == State.STARTED) && isConnected.get() )
        {
            client.checkExists().creatingParentContainersIfNeeded()
              .usingWatcher(watcher).inBackground(backgroundCallback).forPath(path);
        }
    }

reset方法其实将watcher添加path路径中,并且针对checkExist方法增加回调方法backgroundCallback,那么该回调拿到的CuratorEvent事件肯定是EXIST事件。其实rest并没有获取节点数据。
看一下最终调用方法processBackgroundResult()方法:

    private void processBackgroundResult(CuratorEvent event) throws Exception
    {
        switch ( event.getType() )
        {
            case GET_DATA:
            {
                if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
                {
                    ChildData childData = new ChildData(path, event.getStat(), event.getData());
                    setNewData(childData);
                }
                break;
            }

            case EXISTS:
            {
                if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() )
                {
                    setNewData(null);
                }
                else if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
                {
                    if ( dataIsCompressed )
                    {                       
                          client.getData().decompressed().usingWatcher(watcher)
                                                 .inBackground(backgroundCallback).forPath(path);
                    }
                    else {
                        client.getData().usingWatcher(watcher)
                                  .inBackground(backgroundCallback).forPath(path);           
                    }
                }
                break;
            }
        }
    }

当限制任然是EXISTS事件时,判断是否有节点,如果没有则setNewData方法,即设置Node的data数据为空。那么如果存在节点,他然后没有去主动获取得到data数据,怎么做的?看一下他执行了getData()方法,并且添加了watcher事件,但是任然通过回调方法,那么此时回调方法是GET_DATA事件了。最终processBackgroundResult方法是执行了case GET_DATA这块代码。因为此时发起获取数据时,会将数据添加到CuratorEvent中,此时生成了ChildData对象,包括了path,stat和节点数据等信息。
考虑一下zookeeper中Watcher是什么时候才能触发事件,当然是节点删除,更新,或者创建才会发起,那么怎么才能使用在NodeCache中。再来看一下NodeCache自带的属性watcher实现类

    private Watcher watcher = new Watcher()
    {
        @Override
        public void process(WatchedEvent event)
        {
            try
            {
                reset();
            }
            catch(Exception e)
            {
                ThreadUtils.checkInterrupted(e);
                handleException(e);
            }
        }
    };

其实他任然调用reset方法,就是这么简单。还是先是发起EXISTS事件,然后GET_DATA事件。但是考虑清楚,watcher事件是一次性触发功能,不会执行第二次,所以在reset中,都会对path添加watcher事件。
那么NodeCache节点变更,如何通知添加在监听容器内的监听器的?

    private void setNewData(ChildData newData) throws InterruptedException
    {
        ChildData   previousData = data.getAndSet(newData);
        if ( !Objects.equal(previousData, newData) )
        {
            listeners.forEach
            (
                new Function<NodeCacheListener, Void>()
                {
                    @Override
                    public Void apply(NodeCacheListener listener)
                    {
                        try
                        {
                            listener.nodeChanged();
                        }
                        catch ( Exception e )
                        {
                            ThreadUtils.checkInterrupted(e);
                            log.error("Calling listener", e);
                        }
                        return null;
                    }
                }
            );
//.....
    }

在setNewData方法中,通过与原子应用的data中之前存储的previousData比较,如果不同。则那么需要遍历容器内的监听器了,最终执行nodeChanged方法。

• PathChildrenCache 子路径缓存
考虑一下,既然curator能在集群中使用,那么举个最简单的例子,在集群中,增加或者减少服务,需要及时发现才能防止继续调用该服务。那么在curator如何使用?当然在集群中同等服务功能中每台服务都是作为一个节点角色使用的,那好,只要监听节点的变化例如节点移除,或者节点增加了。就能知道服务集群中的变更,那么节点该有哪些标识,可以用服务器的ip和端口组成唯一标识。
所以,引申出来几个概念,
1.需要监听的节点都是某个业务下parent的子节点children,2.针对添加子节点,任然必须在parent下变更;
在curator中引入了子节点管理的几个事件
PathChildrenCacheEvent下的Type类型:

    public enum Type
    {
        /**
         * A child was added to the path
         */
        CHILD_ADDED,

        /**
         * A child's data was changed
         */
        CHILD_UPDATED,

        /**
         * A child was removed from the path
         */
        CHILD_REMOVED,
        //还有其他事件
}

例如子节点新增,变更,删除等其他事件。在curator采用了大量的异步调用线程,并且在PathChildrenCache中通过推送事件方式通知节点状态变更的。
RefreshOperation 刷新事件,主要调用PathChildrenCache中refresh方法;
GetDataOperation 获取节点数据事件,主要调用getDataAndStat方法(),异步方式得到节点数据
EventOperation 推送事件,推送给记录在事件容器中的监听器,发起childEvent方法
那么在PathChildrenCache 分为2种形式的基本路径,parentPath路径,和childPath路径,在zk中,已经提过,当对parentPath进行监听,如果parentPath新增节点,就会触发children事件。所以PathChildrenCache也是利用了这点。
在refresh方法中

    void refresh(final RefreshMode mode) throws Exception
    {
        ensurePath();

        final BackgroundCallback callback = new BackgroundCallback()
        {
            @Override
            public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
            {
                if (PathChildrenCache.this.state.get().equals(State.CLOSED)) {
                    // This ship is closed, don't handle the callback
                    return;
                }
                if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
                {
                    processChildren(event.getChildren(), mode);
                }
            }
        };

        client.getChildren().usingWatcher(childrenWatcher).inBackground(callback).forPath(path);
    }

对client添加了childrenWatcher,然后内部有个回调类callback,他接受的CuratorEventType类型肯定是CHILDREN事件。在processChildren方法中,

    private void processChildren(List<String> children, RefreshMode mode) throws Exception
    {
        Set<String> removedNodes = Sets.newHashSet(currentData.keySet());
        for ( String child : children ) {
            removedNodes.remove(ZKPaths.makePath(path, child));
        }

        for ( String fullPath : removedNodes )
        {
            remove(fullPath);
        }

        for ( String name : children )
        {
            String fullPath = ZKPaths.makePath(path, name);

            if ( (mode == RefreshMode.FORCE_GET_DATA_AND_STAT) || !currentData.containsKey(fullPath) )
            {
                getDataAndStat(fullPath);
            }

            updateInitialSet(name, NULL_CHILD_DATA);
        }
        maybeOfferInitializedEvent(initialSet.get());
    }

其中children 是当前parent地下所有的子路径的名字(不是完整的路径)。与本地记录的当前数据,比较出需要移除的节点,发送EventOperation事件中的子节点移除事件。然后通过RefreshMode模式或者当前currentData中没有保护子节点的全路径,那么需要获取数据。
在getDataAndStat()方法中

    void getDataAndStat(final String fullPath) throws Exception
    {
        BackgroundCallback callback = new BackgroundCallback()
        {
            @Override
            public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
            {
                applyNewData(fullPath, event.getResultCode(), event.getStat(), cacheData ? event.getData() : null);
            }
        };

        if ( USE_EXISTS && !cacheData )
        {
            client.checkExists().usingWatcher(dataWatcher).inBackground(callback).forPath(fullPath);
        }
        else
        {
            // always use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
            if ( dataIsCompressed && cacheData )
            {
                client.getData().decompressed().usingWatcher(dataWatcher).inBackground(callback).forPath(fullPath);
            }
            else
            {
                client.getData().usingWatcher(dataWatcher).inBackground(callback).forPath(fullPath);
            }
        }
    }

通过是否需要保持节点数据执行相应的方法,但是这里都有个共同的是,对该子节点全路径添加dataWatcher事件,那么该路径的删除或者变更,都会通知到dataWatcher事件中。
在PathChildrenCache中存在2中Watcher事件对象

    private volatile Watcher childrenWatcher = new Watcher()
    {
        @Override
        public void process(WatchedEvent event)
        {
            offerOperation(new RefreshOperation(PathChildrenCache.this, RefreshMode.STANDARD));
        }
    };

    private volatile Watcher dataWatcher = new Watcher()
    {
        @Override
        public void process(WatchedEvent event)
        {
            try
            {
                if ( event.getType() == Event.EventType.NodeDeleted )
                {
                    remove(event.getPath());
                }
                else if ( event.getType() == Event.EventType.NodeDataChanged )
                {
                    offerOperation(new GetDataOperation(PathChildrenCache.this, event.getPath()));
                }
            }
            catch ( Exception e )
            {
                ThreadUtils.checkInterrupted(e);
                handleException(e);
            }
        }
    };

有针对parent路径添加childrenWatcher事件,基本上子节点变更,都会触发,然后通过刷新事件,异步方式重新执行refresh方法。
由这对具体的child路径田间dataWatcher事件,主要是子节点路径删除,或者数据变更,做出相应的移除事件或者获取数据事件动作。
熟悉了PathChildrenPath的工作原理,那么在工作中如何整合。首先我们要声明自己的PathChildrenCacheListener 监听器实现类,有了他,才能知道节点的变更情况。 假设有个业务功能是多组服务器支撑提供,需要保证他能可动态调整服务器资源。那么上游调用者就可以通过PathChildrenPath工具监听当前提供服务器组有哪些,而不用实时关心,在发起调用时,去判断当前存在的服务器信息了。

locks包

在分布式系统中,如果要使用公用某一资源时候,往往会申请一个分布式锁。curator也提供了分布式锁,利用了zk的特性。使用方式很简单:

    public void testDistributeLock() throws Exception {
        String host = "127.0.0.1:2181";
        String path = root + "/lock_test";
        CuratorFramework curator = CuratorClient.create(host);
        InterProcessMutex mutex = new InterProcessMutex(curator, path);
        if(mutex.acquire(10, TimeUnit.SECONDS)) {
            try {
                // 业务逻辑
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                mutex.release();
            }
        }
    }

path就是比作资源,锁针对path就行资源获取,然后执行业务逻辑,最终都需要release锁资源。观察一下InterProcessMutex工作原理。
首先通过acquire方法了解internalLock方法

    private boolean internalLock(long time, TimeUnit unit) throws Exception
    {
        /*
           Note on concurrency: a given lockData instance
           can be only acted on by a single thread so locking isn't necessary
        */

        Thread currentThread = Thread.currentThread();

        LockData lockData = threadData.get(currentThread);
        if ( lockData != null )
        {
            // re-entering
            lockData.lockCount.incrementAndGet();
            return true;
        }

        String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
        if ( lockPath != null )
        {
            LockData newLockData = new LockData(currentThread, lockPath);
            threadData.put(currentThread, newLockData);
            return true;
        }

        return false;
    }

为什么通过当前线程得知锁的状态数据,加入从threadData拿到了lockData数据,说明在这个线程之前就已经获取锁资源了,如果重复获取同一个锁,那么只要记录lockCount数量即可。当前线程没有保存的锁资源,需要通过internals内置锁工具尝试获取锁,最终得到一个lockPath,然后进行封装成LockData保存在threadData中,没有返回路径,说明获取锁失败了。
看一下LockInternals 类attemptLock方法,是如何尝试获取锁

    LockInternals(CuratorFramework client, LockInternalsDriver driver, String path, String lockName, int maxLeases)
    {
        this.driver = driver;
        this.lockName = lockName;
        this.maxLeases = maxLeases;

        this.client = client;
        this.basePath = PathUtils.validatePath(path);
        this.path = ZKPaths.makePath(path, lockName);
    }

在构造器中,有两个路径,basePath基本路径,还有path这个路径是通过basePath与lockName(默认名字为lock-)组合起来,说明这里的path是basePath的子节点路径。还有一个参数 driver,默认通过StandardLockInternalsDriver类实现的,该类主要负责创建路径,判断是否能获取锁。看一下StandardLockInternalsDriver创建路径代码:

    public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception
    {
        String ourPath;
        if ( lockNodeBytes != null )
        {
            ourPath = client.create().creatingParentContainersIfNeeded().withProtection()
                  .withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes);
        }
        else
        {
            ourPath = client.create().creatingParentContainersIfNeeded().withProtection()
                  .withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
        }
        return ourPath;
    }

其中path路径时基本路径与lockName组合起来的最终路径名称,但是创建的时候,采用了EPHEMERAL_SEQUENTIAL模式得到的路径,首先路径是非永久状态存储的,如果连接端口,该ourPath就会删除。然后还有特点是有序的,就是ourPath的路径是path路径与顺序编号组合在一起的,并且是有序递增编号的路径,例如 /test/lock-000001,test/lock-000002。每次创建都会增加编号,而且不会重复,这是zookeeper中一个特性。
再来熟悉LockInternals中的attemptLock方法。

    String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception
    {
        final long      startMillis = System.currentTimeMillis();
        final Long      millisToWait = (unit != null) ? unit.toMillis(time) : null;
        final byte[]    localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
        int             retryCount = 0;

        String          ourPath = null;
        boolean         hasTheLock = false;
        boolean         isDone = false;
        while ( !isDone )
        {
            isDone = true;
            try {
                ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
                hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
            }
            catch ( KeeperException.NoNodeException e ) {
                // gets thrown by StandardLockInternalsDriver when it can't find the lock node
                // this can happen when the session expires, etc. So, if the retry allows, just try it all again
                if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) ) {
                    isDone = false;
                }
                else  {
                    throw e;
                }
            }
        }

        if ( hasTheLock ) {
            return ourPath;
        }

        return null;
    }

为什么在一个循环体内,是为了容错,在创建ourPath失败时,进行重复尝试。通过driver创建了一个非持久的并且有序编号的ourPath路径,那么考虑一下,因为路径时的编号是递增的,那么编号越小,那么他获得锁的概率应该是最大的,因为他是最早创建路径,也就分配的编号小了。当定义完这个获取锁的规则后,后续就方便很多了。
在internalLockLoop方法中,如何与等待时间相结合,当获得锁后,就可以成功呢?

    private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception {
        boolean     haveTheLock = false;
        boolean     doDelete = false;
        try {
            if ( revocable.get() != null ) {
                client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
            }

            while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock ) {
                List<String>        children = getSortedChildren();
                String              sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash

                PredicateResults    predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
                if ( predicateResults.getsTheLock() ) {
                    haveTheLock = true;
                }
                else {
                    String  previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();

                    synchronized(this) {
                        try  {
                            // use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
                            client.getData().usingWatcher(watcher).forPath(previousSequencePath);
                            if ( millisToWait != null ) {
                                millisToWait -= (System.currentTimeMillis() - startMillis);
                                startMillis = System.currentTimeMillis();
                                if ( millisToWait <= 0 ) {
                                    doDelete = true;    // timed out - delete our node
                                    break;
                                }
                                wait(millisToWait);
                            }
                            else {
                                wait();
                            }
                        }
                        catch ( KeeperException.NoNodeException e )  {
                            // it has been deleted (i.e. lock released). Try to acquire again
                        }
                    }
                }
            }
        }
        catch ( Exception e )
        {
            ThreadUtils.checkInterrupted(e);
            doDelete = true;
            throw e;
        }
        finally
        {
            if ( doDelete )
            {
                deleteOurPath(ourPath);
            }
        }
        return haveTheLock;
    }

在while循环体内,获取了所有basePath下的children节点名称,并且进行从小到大编号排序。然后通过driver中的getsTheLock方法得到我节点的上一个节点名称,如果不存在,说明我的节点是第一个,那么我就能获取锁。
如果存在前一个节点,构建完整的路径previousSequencePath,并对该路径进行监听,增加watcher事件。为什么要这么?还是那个问题,编号是有序递增的,只有当我前一个节点释放锁了,下一个是我,我就能得到锁。那么前一个节点如何释放锁,可以主动删除节点,或者掉线系统自动删除。在对previousSequencePath添加watcher事件后,进入等待,那么当前线程等待时通过谁来唤醒呢?当然是通过watcher来唤醒,通过调用notifyAll方式唤醒线程,然后重新执行循环,知道超时,或者得到锁。这里有个需要考虑,在超时时,将doDelete标记为删除,然后再finally方法中通过这个状态去删除ourPath节点,为什么要这样呢?因为超时情况下,认定是没有获取锁,但是路径我已经创建了,如果不去主动删除,那么他会一直占用,在ourPath后面的路径就会一直等着他主动删除。在考虑一下,这里为什么会存在 KeeperException.NoNodeException异常呢?因为在对previousSequencePath进行监听时,假设这个锁刚好释放了,已经删除了previousSequencePath路径,那么当前去监听时,路径就会不存在,然后会抛出节点不存在的异常。
这是一个完整的获取锁的流程,也很严谨的处理各种出现异常时的逻辑。当然获取锁,用完就要进行释放。
在Mutex中的release方法中

    public void release() throws Exception
    {        
        Thread currentThread = Thread.currentThread();
        LockData lockData = threadData.get(currentThread);
        if ( lockData == null )   {
            throw new IllegalMonitorStateException("You do not own the lock: " + basePath);
        }

        int newLockCount = lockData.lockCount.decrementAndGet();
        if ( newLockCount > 0 ) {
            return;
        }
        if ( newLockCount < 0 ) {
            throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath);
        }
        try {
            internals.releaseLock(lockData.lockPath);
        }
        finally
        {
            threadData.remove(currentThread);
        }
    }

首先释放时,从LockData中对lockCount进行扣减,如果任然大于0,就return了,表示该锁没有用完。最终通过internals执行releaseLock方法,然后移除掉threadData中的currentThread数据,在LockInternal中释放就很简单了,就是调用删除路径功能,达到释放资源效果。那么监听此时的lockPath时,就能监听到删除事件,就会获取锁。
在java并发包中存在读写锁,那么在curator中也存在这样的读写分布式锁-InterProcessReadWriteLock。
大致与InterProcessMutex思想一致的,但是内部有两个InterProcessMutex组成,一个是readMutex,一个是writeMutex。与Java中的ReentrantReadWriteLock一样,如果当前是read获取到了资源,那么另外一个read线程也能获取资源,都是read资源是不进行互斥的,但是如果有write资源,那么就会互斥。写与写资源也是存在互斥的。所以InterProcessReadWriteLock是如何实现功能的?
无论read,write锁,他们的basePath肯定是一致的,而且在zk中,创建子节点为序列化的时候,不会因为子节点的名称不一样,编号会重置。而且同等对待,编号永远是有序递增的。那么好了,writeMutex就是互斥锁,与什么请求资源无关,readMutex只要判断在之前的节点中存在write路径,那么就需要等待。那么怎么判断呢。在StandardLockInternalsDriver中有个getsTheLock方法,该方法返回的PredicateResults结果才能知道是否能拿到锁,或者对前一个路径进行监听。
在readMutex锁重新了该方法

    private PredicateResults readLockPredicate(List<String> children, String sequenceNodeName) throws Exception
    {
        if ( writeMutex.isOwnedByCurrentThread() )
        {
            return new PredicateResults(null, true);
        }

        int         index = 0;
        int         firstWriteIndex = Integer.MAX_VALUE;
        int         ourIndex = -1;
        for ( String node : children )
        {
            if ( node.contains(WRITE_LOCK_NAME) )
            {
                firstWriteIndex = Math.min(index, firstWriteIndex);
            }
            else if ( node.startsWith(sequenceNodeName) )
            {
                ourIndex = index;
                break;
            }

            ++index;
        }

        StandardLockInternalsDriver.validateOurIndex(sequenceNodeName, ourIndex);

        boolean     getsTheLock = (ourIndex < firstWriteIndex);
        String      pathToWatch = getsTheLock ? null : children.get(firstWriteIndex);
        return new PredicateResults(pathToWatch, getsTheLock);
    }

首先判断是否是重复锁,然后查询第一个firstWriteIndex写路径的位置,与ourIndex自己的位置,进行比较。如果ourIndex小,那么就可以获得锁了,如果大,那么需要监听firstWriteIndex的对应的路径了。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,504评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,434评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,089评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,378评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,472评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,506评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,519评论 3 413
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,292评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,738评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,022评论 2 329
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,194评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,873评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,536评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,162评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,413评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,075评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,080评论 2 352

推荐阅读更多精彩内容