分布式锁的一些实现框架的笔记

在了解了java JDK锁的相关知识后准备了解一些关于分布式锁的知识。关于jdk里面的锁实现其实纠其根本大多数是通过竞争state来进行执行权限的,其实分布式锁类似,只是分布式锁将竞争的资源放在了一个多个应用(多个jvm)能够共同访问的位置上。比如redis实现分布式锁的框架 redission,又比如zookeeper大家无外乎都是讲一个字符串,亦或者一个节点放到一个多个服务都可以访问的位置(redis-server)或者zookeeper的server。

本文先从zookeeper里面是如何实现分布式锁的开始。在开始聊这个话题之前我们需要先想清楚一个问题这些能够被大家共享的server是不是在保存这些代表着锁含义的字符串或者节点的时候要保障线程安全,答案是肯定的,我们知道redis是没问题,但是zookeper是如何保障这个问题的呢。
接下来我们先将这个问题摆平,也就是先将zookeeper,server内部的一些线程安全机制搞清楚,这是实现分布式锁的关键,不然并发如果不能控制那么就会造成非常多的隐患。我们知道zookeeper是通过datatree来存数据节点的。虽然不是zookeeper源码专场但是我们为了表示对源码的尊重还是要下载一下的看下底层的一个实现机制
下载好源码我们先从zookeeper的bin目录里面找到zkServer.sh这是zookeeper启动的脚本程序,通过阅读这段脚本我们看到他的入口

  if [ "x$JMXDISABLE" = "x" ] || [ "$JMXDISABLE" = 'false' ]
then
  echo "ZooKeeper JMX enabled by default" >&2
  if [ "x$JMXPORT" = "x" ]
  then
    # for some reason these two options are necessary on jdk6 on Ubuntu
    #   accord to the docs they are not necessary, but otw jconsole cannot
    #   do a local attach
    ZOOMAIN="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.local.only=$JMXLOCALONLY org.apache.zookeeper.server.quorum.QuorumPeerMain"
  

从源码中找到这个类QuorumPeerMain,从源码中找到这个类为了避免篇幅过长这里不贴代码了
程序入口为main方法--》main.initializeAndRun(args)--》 runFromConfig(config);下面的代码是runFromConfig的代码片段找到我们想要找的内容

 
            quorumPeer.setClientPortListenBacklog(config.getClientPortListenBacklog());
            quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
            quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);
            if (config.getLastSeenQuorumVerifier() != null) {
                quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false);
            }
            quorumPeer.initConfigInZKDatabase();
            quorumPeer.setCnxnFactory(cnxnFactory);
            quorumPeer.setSecureCnxnFactory(secureCnxnFactory);
            quorumPeer.setSslQuorum(config.isSslQuorum());
            quorumPeer.setUsePortUnification(config.shouldUsePortUnification());
            quorumPeer.setLearnerType(config.getPeerType());
            quorumPeer.setSyncEnabled(config.getSyncEnabled());
            quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());

我们看到非常多代码,在这部分代码的第二行可以看到ZKDatabase被创建,打开ZKDatabase类可以看到里面有一个成员属性DataTree,这个DataTree就是我们要找的zookeeper中存储数据的对象。代码截取一部分如下

 /**
     * This map provides a fast lookup to the datanodes. The tree is the
     * source of truth and is where all the locking occurs
     */
存储node的集合
    private final NodeHashMap nodes;
数据监听器集合
    private IWatchManager dataWatches;
节点监听器集合
    private IWatchManager childWatches;
 /** cached total size of paths and data for all DataNodes */
  当前集合中拥有的node计数器,利用的是线程安全的Long类型原子计数器  
    private final AtomicLong nodeDataSize = new AtomicLong(0);

接下来带着我们的问题去看一下create方法

/**
     * Add a new node to the DataTree.
     * @param path
     *            Path for the new node.
     * @param data
     *            Data to store in the node.
     * @param acl
     *            Node acls
     * @param ephemeralOwner
     *            the session id that owns this node. -1 indicates this is not
     *            an ephemeral node.
     * @param zxid
     *            Transaction ID
     * @param time
     * @param outputStat
     *            A Stat object to store Stat output results into.
     * @throws NodeExistsException
     * @throws NoNodeException
     */
    public void createNode(final String path, byte[] data, List<ACL> acl, long ephemeralOwner, int parentCVersion, long zxid, long time, Stat outputStat) throws KeeperException.NoNodeException, KeeperException.NodeExistsException {
        int lastSlash = path.lastIndexOf('/');
        String parentName = path.substring(0, lastSlash);
        String childName = path.substring(lastSlash + 1);
        StatPersisted stat = createStat(zxid, time, ephemeralOwner);
        DataNode parent = nodes.get(parentName);

        if (parent == null) {
            throw new KeeperException.NoNodeException();
        }
        synchronized (parent) {
            // Add the ACL to ACL cache first, to avoid the ACL not being
            // created race condition during fuzzy snapshot sync.
            //
            // This is the simplest fix, which may add ACL reference count
            // again if it's already counted in in the ACL map of fuzzy
            // snapshot, which might also happen for deleteNode txn, but
            // at least it won't cause the ACL not exist issue.
            //
            // Later we can audit and delete all non-referenced ACLs from
            // ACL map when loading the snapshot/txns from disk, like what
            // we did for the global sessions.
            Long longval = aclCache.convertAcls(acl);

            Set<String> children = parent.getChildren();
            if (children.contains(childName)) {
                throw new KeeperException.NodeExistsException();
            }

            nodes.preChange(parentName, parent);
            if (parentCVersion == -1) {
                parentCVersion = parent.stat.getCversion();
                parentCVersion++;
            }
            // There is possibility that we'll replay txns for a node which
            // was created and then deleted in the fuzzy range, and it's not
            // exist in the snapshot, so replay the creation might revert the
            // cversion and pzxid, need to check and only update when it's
            // larger.
            if (parentCVersion > parent.stat.getCversion()) {
                parent.stat.setCversion(parentCVersion);
                parent.stat.setPzxid(zxid);
            }
            DataNode child = new DataNode(data, longval, stat);
            parent.addChild(childName);
            nodes.postChange(parentName, parent);
            nodeDataSize.addAndGet(getNodeSize(path, child.data));
            nodes.put(path, child);
            EphemeralType ephemeralType = EphemeralType.get(ephemeralOwner);
            if (ephemeralType == EphemeralType.CONTAINER) {
                containers.add(path);
            } else if (ephemeralType == EphemeralType.TTL) {
                ttls.add(path);
            } else if (ephemeralOwner != 0) {
                HashSet<String> list = ephemerals.get(ephemeralOwner);
                if (list == null) {
                    list = new HashSet<String>();
                    ephemerals.put(ephemeralOwner, list);
                }
                synchronized (list) {
                    list.add(path);
                }
            }
            if (outputStat != null) {
                child.copyStat(outputStat);
            }
        }
        // now check if its one of the zookeeper node child
        if (parentName.startsWith(quotaZookeeper)) {
            // now check if its the limit node
            if (Quotas.limitNode.equals(childName)) {
                // this is the limit node
                // get the parent and add it to the trie
                pTrie.addPath(parentName.substring(quotaZookeeper.length()));
            }
            if (Quotas.statNode.equals(childName)) {
                updateQuotaForPath(parentName.substring(quotaZookeeper.length()));
            }
        }
        // also check to update the quotas for this node
        String lastPrefix = getMaxPrefixWithQuota(path);
        long bytes = data == null ? 0 : data.length;
        if (lastPrefix != null) {
            // ok we have some match and need to update
            updateCountBytes(lastPrefix, bytes, 1);
        }
        updateWriteStat(path, bytes);
        dataWatches.triggerWatch(path, Event.EventType.NodeCreated);
        childWatches.triggerWatch(parentName.equals("") ? "/" : parentName, Event.EventType.NodeChildrenChanged);
    }

这段代码通过传入的path截取后获得对应path的父节点,从成员变量nodes中取出对应的path结点对象,通过synchronized 关键字进行并发控制,这点其实也采用了分而治之的思想,将锁的力度尽可能的缩小,提升并发性能。因为synchronized 在JVM 中的实现是通过锁定对象的head完成的临界控制。好,我们想要的答案截止到目前就出来了,也就是说在zookeper的server端帮我们进行了并发的控制,确保了分布式锁在加锁时候的线程安全性
另外这里mark一下在这段代码的最后几行其实是在触发监听事件。这里运用了设计模式中的观察者模式,能够将create事情传播出去,调用相应的注册在服务端的监听器,将通知压入待发送队列。当然这里面涉及到与client的交互,那属于socket层面的话题了,不做展开。其他的操作与监听完全类似。

好下面开始分析curator这个zookeeper客户端对于分布式锁的源码部分

先从使用入手,感受一下这货带来的简单api(如丝般顺滑)

public class Recipes_Lock {

    static String lock_path = "/curator_recipes_lock_path";
    static CuratorFramework client = CuratorFrameworkFactory.builder()
            .connectString("domain1.book.zookeeper:2181")
            .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
    public static void main(String[] args) throws Exception {
        client.start();
        final InterProcessMutex lock = new InterProcessMutex(client,lock_path);
        final CountDownLatch down = new CountDownLatch(1);
        for(int i = 0; i < 30; i++){
            new Thread(new Runnable() {
                public void run() {
                    try {
                        down.await();
                        lock.acquire();
                    } catch ( Exception e ) {}
                    SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss|SSS");
                    String orderNo = sdf.format(new Date());
                    System.out.println("生成的订单号是 : "+orderNo);
                    try {
                        lock.release();
                    } catch ( Exception e ) {}
                }
            }).start();
        }
        down.countDown();
    }
}

熟悉的acquire() 熟悉的lock(),离不开李大爷的恩泽。
先点开使用到的类InterProcessMutex我将核心的属性列到下面进行标记

public class InterProcessMutex implements InterProcessLock, Revocable<InterProcessMutex>
{
    可以理解为锁对象的引用
    private final LockInternals         internals;
    锁定的路径
    private final String                basePath;
    线程与锁定数据的并发集合,用于检查当前jvm中当前线程是否已经在zookeeper中加锁,如果有了则进行计数器自增,避免给server造成过大压力。
    private final ConcurrentMap<Thread, LockData>   threadData = Maps.newConcurrentMap();
threadData中要存的value对象的类型。主要用于保存当前线程引用,与重入次数
    private static class LockData
    {
        final Thread        owningThread;
        final String        lockPath;
        final AtomicInteger lockCount = new AtomicInteger(1);

        private LockData(Thread owningThread, String lockPath)
        {
            this.owningThread = owningThread;
            this.lockPath = lockPath;
        }
    }
}

下面看下acquire方法

/**
     * Acquire the mutex - blocking until it's available. Note: the same thread
     * can call acquire re-entrantly. Each call to acquire must be balanced by a call
     * to {@link #release()}
     *
     * @throws Exception ZK errors, connection interruptions
     */
    @Override
    public void acquire() throws Exception
    {
        if ( !internalLock(-1, null) )
        {
            throw new IOException("Lost connection while trying to acquire lock: " + basePath);
        }
    }

 private boolean internalLock(long time, TimeUnit unit) throws Exception
    {
        /*
           Note on concurrency: a given lockData instance
           can be only acted on by a single thread so locking isn't necessary
        */
        取到当前线程
        Thread          currentThread = Thread.currentThread();
        通过并发集合进行线程安全的获取当前线程有无对应的数据
        LockData        lockData = threadData.get(currentThread);
        if ( lockData != null )
        {
            // re-entering
            如果有则证明当前jvm中这个线程重入了进行重入自增
            lockData.lockCount.incrementAndGet();
            return true;
        }
        当前没有缓存,那么就进行尝试加锁
        String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
        if ( lockPath != null )
        {  
            如果能够返回锁定路径则说明加锁成功那么就创建一个计数器对象,交给并发集合进行保存。
            LockData        newLockData = new LockData(currentThread, lockPath);
            threadData.put(currentThread, newLockData);
            return true;
        }

        return false;
    }

我们着重再看上面代码中的 internals.attemptLock

 String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception
    {
        final long      startMillis = System.currentTimeMillis();
        final Long      millisToWait = (unit != null) ? unit.toMillis(time) : null;
        final byte[]    localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
        int             retryCount = 0;

        String          ourPath = null;
        boolean         hasTheLock = false;
        boolean         isDone = false;
        while ( !isDone )
        {
            isDone = true;

            try
            {    如果有要写入的数据
                if ( localLockNodeBytes != null )
                {
                    通过api写入基于会话的递增的节点,并且将数据写入进去
                    ourPath = client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, localLockNodeBytes);
                }
                else
                {
                没有数据写入的情况则建立结点就好了。
                    ourPath = client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
                }
                hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
            }
            catch ( KeeperException.NoNodeException e )
            {
                // gets thrown by StandardLockInternalsDriver when it can't find the lock node
                // this can happen when the session expires, etc. So, if the retry allows, just try it all again
                if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) )
                {
                    isDone = false;
                }
                else
                {
                    throw e;
                }
            }
        }

        if ( hasTheLock )
        {
            return ourPath;
        }

        return null;
    }

上面代码是进行尝试加锁,但是是否能够加锁成功还要靠internalLockLoop方法

private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception
    {
        boolean     haveTheLock = false;
        boolean     doDelete = false;
        try
        {
            if ( revocable.get() != null )
            {
                client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
            }

            while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )
            {
                List<String>        children = getSortedChildren();
                String              sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash

                PredicateResults    predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
                if ( predicateResults.getsTheLock() )
                {
                    haveTheLock = true;
                }
                else
                {
                    String  previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();

                    synchronized(this)
                    {
                        Stat stat = client.checkExists().usingWatcher(watcher).forPath(previousSequencePath);
                        if ( stat != null )
                        {
                            if ( millisToWait != null )
                            {
                                millisToWait -= (System.currentTimeMillis() - startMillis);
                                startMillis = System.currentTimeMillis();
                                if ( millisToWait <= 0 )
                                {
                                    doDelete = true;    // timed out - delete our node
                                    break;
                                }

                                wait(millisToWait);
                            }
                            else
                            {
                                wait();
                            }
                        }
                    }
                    // else it may have been deleted (i.e. lock released). Try to acquire again
                }
            }
        }
        catch ( Exception e )
        {
            doDelete = true;
            throw e;
        }
        finally
        {
            if ( doDelete )
            {
                deleteOurPath(ourPath);
            }
        }
        return haveTheLock;
    }

这段代码的实现逻辑是先将要加锁的路径将孩子结点取出来,然后将上一步取得的node的path进行比较没如果自己的序号比其他同级孩子结点小那么则加锁成功,否则注册监听事件到比当前path小的那个结点上面去。

综上所述,进行一个简单的总结后面再慢慢完善,zookeeper的server通过保存一个datatree将数据节点进行保存,然后当客户端进行节点创建时候为了达到并发与线程安全的折中,使用了一个ConcurrentHashMap进行并发控制,并且为了减小锁的粒度,是将客户端需要创建的子节点的父亲节点进行加锁,这样就降低了并发阻塞,不使用全局阻塞(一开始没想明白后来看了源码才懂)。这样就解决了服务端线程安全的问题,防止出现锁覆盖等问题。
客户端在进行加锁时候先是创建一个加锁的对象,锁对象里面有一个当前锁加锁成功的ConcurrentHashMap集合,这么做的目的一方面是能够准确的进行重入计数,不降数据存储进去zookeeper更好的是将压力丢给client而不是去轮训zookeeper-server,这点在redission中使用了分桶进行加锁字符串,缓存的思路是一致的。比较好的降低了轮训给服务端带来的额外开支。

另外客户端在实现加锁时候如果本地缓存中没有线程对象则开始调用接口向远程server写入基于session的节点,之所以支持会话是在当前jvm崩溃后能够释放掉锁。写入成功后则返回path,拿到path后,进行当前path下,兄弟节点的查询,因为server的自增性,所以如果当前节点的序号是最小的就认为当前线程拿到了锁,如果不是最小的,则注册一个监听事件给比他大的最近的那个node(删除事件)。程序进入等待。
以上就是对zookeeper-servery,与使用curator进行分布式加锁的一些源码的阅读心得与体会。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,047评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,807评论 3 386
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,501评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,839评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,951评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,117评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,188评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,929评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,372评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,679评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,837评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,536评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,168评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,886评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,129评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,665评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,739评论 2 351