curator 介绍 party2

atomic包

在java.util并发包中,有很多继承number的原子计数类,例如AtomicLong, AtomicInteger等等。在分布式开发过程中,往往也有相应的需求对同一资源进行计数。在curator中也提供了分布式原子计数类,例如DistributedAtomicInteger,DistributedAtomicLong。在zk中每一个节点,可以存储相应的数据,这些数据是二进制方式存储的。不光有对应的数据,还有节点存储的状态信息。
在zookeeper包中DataNode类中,有对应的状态对象StatPersisted

public class StatPersisted implements Record {
  private long czxid;
  private long mzxid;
  private long ctime;
  private long mtime;
  private int version;
  private int cversion;
  private int aversion;
  private long ephemeralOwner;
  private long pzxid;
  // getter and setter...
}

熟悉一下几个重要的属性
cZxid:数据节点创建时的事务ID
ctime:数据节点创建时的时间
mZxid:数据节点最后一次更新时的事务ID
mtime:数据节点最后一次更新时的时间
version:数据节点的版本号
有一个记录节点数据变更的属性version,通过测试观察,对节点数据变更,version版本也会随着变化,会逐渐递增。在curator的分布式原子计数类,也真是通过该特性实现的。
• DistributedAtomicInteger
举个integer类型的分布式原子计数,考虑一下,因为zk DataNode中存储的是二进制数据,那么获取数据肯定需要通过二进制转成integer,或者更新数据时需要将integer转换成二进制。其实本质就是二进制数据的变更,与需要Integer,或者Long类型没有关系。所以方法中 get,increment,decrement等等,通过DistributedAtomicValue类实现的。
举一个increment方法为例

    private AtomicValue<Integer>   worker(final Integer addAmount) throws Exception {
        Preconditions.checkNotNull(addAmount, "addAmount cannot be null");

        MakeValue               makeValue = new MakeValue()
        {
            @Override
            public byte[] makeFrom(byte[] previous)
            {
                int        previousValue = (previous != null) ? bytesToValue(previous) : 0;
                int        newValue = previousValue + addAmount;
                return valueToBytes(newValue);
            }
        };

        AtomicValue<byte[]>     result = value.trySet(makeValue);
        return new AtomicInteger(result);
    }

在DistributedAtomicInteger中increment或者decrement方法,都是调用worker方法,只是参数中addAmount 可以设置为1,或-1表示增加或者减少。其中有个内部类MakeValue该接口主要实现了将int转换成byte数组了。

    AtomicValue<byte[]>   trySet(MakeValue makeValue) throws Exception {
        MutableAtomicValue<byte[]>  result = new MutableAtomicValue<byte[]>(null, null, false);

        tryOptimistic(result, makeValue);
        if ( !result.succeeded() && (mutex != null) )
        {
            tryWithMutex(result, makeValue);
        }

        return result;
    }

在DistributedAtomicValue中,如何设置更新zk节点值。首先,是通过tryOptimistic方法,该方法采用乐观方式去更新,如果更新失败,最后通过方法tryWithMutex 分布式锁去更新数据。

    private void tryOptimistic(MutableAtomicValue<byte[]> result, MakeValue makeValue) throws Exception
    {
        long            startMs = System.currentTimeMillis();
        int             retryCount = 0;

        boolean         done = false;
        while ( !done )
        {
            result.stats.incrementOptimisticTries();
            if ( tryOnce(result, makeValue) )
            {
                result.succeeded = true;
                done = true;
            }
            else
            {
                if ( !retryPolicy.allowRetry(retryCount++, System.currentTimeMillis() - startMs, RetryLoop.getDefaultRetrySleeper()) )
                {
                    done = true;
                }
            }
        }

        result.stats.setOptimisticTimeMs(System.currentTimeMillis() - startMs);
    }

乐观方式更新,主要思想是多次更新,直到更新成功或者尝试次数,时间已经到达最大值。看一下tryOnce方法:

    private boolean tryOnce(MutableAtomicValue<byte[]> result, MakeValue makeValue) throws Exception {
        Stat        stat = new Stat();
        boolean     createIt = getCurrentValue(result, stat);

        boolean     success = false;
        try {
            byte[]  newValue = makeValue.makeFrom(result.preValue);
            if ( createIt ) {
                client.create().creatingParentContainersIfNeeded().forPath(path, newValue);
            } else {
                client.setData().withVersion(stat.getVersion()).forPath(path, newValue);
            }
            result.postValue = Arrays.copyOf(newValue, newValue.length);
            success = true;
        }
        catch ( KeeperException.NodeExistsException e ) 
            // do Retry
        }
        catch ( KeeperException.BadVersionException e )  {
            // do Retry
        }
        catch ( KeeperException.NoNodeException e ) {
            // do Retry
        }
        return success;
    }

首先需要获取当前节点的值,通过getCurrentValue方法,并且stat状态信息也得到了最新的值。在getCurrentValue方法也能知道createIt状态,节点是否存在然后去创建。如果不需要创建的时候,更新值时,会withVersion(stat.getVersion())执行该方法,就是带了最近的stat中version信息。在抛异常时,一定要合理处理,
例如:
NodeExistsException异常,说明创建的时候,已经存在了,需要重试;
BadVersionException异常,版本异常,通过stat.getVersion()控制,如果zk当前的记录节点的version与请求的version不一致时拒绝更新,返回异常。这个异常在更新时最为常见;
NoNodeException异常,可能节点突然被删除了,那么节点不存在了,需要重试;
通过BadVersionException异常中,看到一些与数据库设计中乐观锁很相似。
既然乐观更新失败,那么需要更加严格方式,例如通过锁,就执行了互斥方式更新

    private void tryWithMutex(MutableAtomicValue<byte[]> result, MakeValue makeValue) throws Exception {
         long            startMs = System.currentTimeMillis();
         int             retryCount = 0;

         if ( mutex.acquire(promotedToLock.getMaxLockTime(), promotedToLock.getMaxLockTimeUnit()) )  {
             try {
                 boolean         done = false;
                 while ( !done )
                 {
                     result.stats.incrementPromotedTries();
                     if ( tryOnce(result, makeValue) ) {
                         result.succeeded = true;
                         done = true;
                     }
                     else {
                         if ( !promotedToLock.getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMs, RetryLoop.getDefaultRetrySleeper()) ) {
                             done = true;
                         }
                     }
                 }
             }
             finally {
                 mutex.release();
             }
         }
         result.stats.setPromotedTimeMs(System.currentTimeMillis() - startMs);
    }

他很简单,就是在更新值前,通过mutex尝试获取锁。其他与乐观方式基本一致。
在分布式原子计数中,都有可能会设置失败的,需要我们后续处理。
在DistributedAtomicInteger中,任然以increment举例,他返回的对象是AtomicValue接口,其中包含了判断是否成功succeeded方法。

leader包

选举包,在分布式开发,选举leader也是很重要的功能。因为在分布式服务中,服务器的角色是对等,但是有些业务往往需要通过leader单独处理,不能同时多个服务处理业务,可能会造成数据功能混乱等。
在curator中 选举leader提供了2中方式:
LeaderLatch, LeaderSelector。其中LeaderSelector实现原理很简单,利用Mutex互斥锁,获取锁即是leader,利用实现LeaderSelectorListener接口,takeLeadership方法就是当是leader时调用该方法,该方法执行完后,会主动释放leader(释放锁),让其他线程或者服务争夺leader。
LeaderLatch,稍微复杂,但是本质与分布式锁的实现原理一致。首先如何知道获取leader,或者不是leader,通过实现接口 LeaderLatchListener

  /**
   * This is called when the LeaderLatch's state goes from hasLeadership = false to hasLeadership = true.
   *
   * Note that it is possible that by the time this method call happens, hasLeadership has fallen back to false.  If
   * this occurs, you can expect {@link #notLeader()} to also be called.
   */
  public void isLeader();

  /**
   * This is called when the LeaderLatch's state goes from hasLeadership = true to hasLeadership = false.
   *
   * Note that it is possible that by the time this method call happens, hasLeadership has become true.  If
   * this occurs, you can expect {@link #isLeader()} to also be called.
   */
  public void notLeader();

其中调用isLeader时,说明当前是leader;调用notLeader时,说明不是leader;其中实现方法即是开发人员需要去实现的业务逻辑。
在调用start方法时,启动线程后调用internalStart方法,最终调用reset方法

    void reset() throws Exception
    {
        setLeadership(false);
        setNode(null);

        BackgroundCallback callback = new BackgroundCallback()
        {
            @Override
            public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
            {
                if ( debugResetWaitLatch != null )
                {
                    debugResetWaitLatch.await();
                    debugResetWaitLatch = null;
                }

                if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
                {
                    setNode(event.getName());
                    if ( state.get() == State.CLOSED )
                    {
                        setNode(null);
                    }
                    else
                    {
                        getChildren();
                    }
                }
                else
                {
                    log.error("getChildren() failed. rc = " + event.getResultCode());
                }
            }
        };
        client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).inBackground(callback).forPath(ZKPaths.makePath(latchPath, LOCK_NAME), LeaderSelector.getIdBytes(id));
    }


其中创建的路径是EPHEMERAL_SEQUENTIAL模式的,说明节点是非持久且递增的。然后创建成功时候,调用callback回调类。确认创建成功后,开始判断是否是leader。setNode方法记录一下当前的节点名称,便于变更或者close时删除路径。那么getChildren方法,肯定是得到当前有多少子节点创建了

    private void getChildren() throws Exception
    {
        BackgroundCallback callback = new BackgroundCallback()
        {
            @Override
            public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
            {
                if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
                {
                    checkLeadership(event.getChildren());
                }
            }
        };
        client.getChildren().inBackground(callback).forPath(ZKPaths.makePath(latchPath, null));
    }

通过该方法能知道,他来控制选举产生leader,其实通过getChildren后,执行回调方法,最终执行checkLeadership方法。

    private void checkLeadership(List<String> children) throws Exception
    {
        final String localOurPath = ourPath.get();
        List<String> sortedChildren = LockInternals.getSortedChildren(LOCK_NAME, sorter, children);
        int ourIndex = (localOurPath != null) ? sortedChildren.indexOf(ZKPaths.getNodeFromPath(localOurPath)) : -1;
        if ( ourIndex < 0 )
        {
            log.error("Can't find our node. Resetting. Index: " + ourIndex);
            reset();
        }
        else if ( ourIndex == 0 )
        {
            setLeadership(true);
        }
        else
        {
            String watchPath = sortedChildren.get(ourIndex - 1);
            Watcher watcher = new Watcher()
            {
                @Override
                public void process(WatchedEvent event)
                {
                    if ( (state.get() == State.STARTED) && (event.getType() == Event.EventType.NodeDeleted) && (localOurPath != null) )
                    {
                        try
                        {
                            getChildren();
                        }
                        catch ( Exception ex )
                        {
                            ThreadUtils.checkInterrupted(ex);
                            log.error("An error occurred checking the leadership.", ex);
                        }
                    }
                }
            };

            BackgroundCallback callback = new BackgroundCallback()
            {
                @Override
                public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
                {
                    if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() )
                    {
                        // previous node is gone - reset
                        reset();
                    }
                }
            };
            // use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
            client.getData().usingWatcher(watcher).inBackground(callback).forPath(ZKPaths.makePath(latchPath, watchPath));
        }
    }

该方法核心思想也是,得到最小节点的编号,选出他会leader。如果当前不是leader,那么需要监听排名前一位的路径节点,如果前一位的路径变更,那么就需要重新执行getChildren方法了。

    private synchronized void setLeadership(boolean newValue)
    {
        boolean oldValue = hasLeadership.getAndSet(newValue);

        if ( oldValue && !newValue )
        { // Lost leadership, was true, now false
            listeners.forEach(new Function<LeaderLatchListener, Void>()
                {
                    @Override
                    public Void apply(LeaderLatchListener listener)
                    {
                        listener.notLeader();
                        return null;
                    }
                });
        }
        else if ( !oldValue && newValue )
        { // Gained leadership, was false, now true
            listeners.forEach(new Function<LeaderLatchListener, Void>()
                {
                    @Override
                    public Void apply(LeaderLatchListener input)
                    {
                        input.isLeader();
                        return null;
                    }
                });
        }

        notifyAll();
    }

确认leader状态变化,最后遍历所有的LeaderLatchListener。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,875评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,569评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,475评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,459评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,537评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,563评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,580评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,326评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,773评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,086评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,252评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,921评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,566评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,190评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,435评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,129评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,125评论 2 352

推荐阅读更多精彩内容