Curator源码分析--创建client

curator创建zookeeper连接
1.初始化CuratorFramework client = CuratorFrameworkFactory.newClient()
1)入参:服务器IP地址,session超时时间,连接超时时间,重试策略
2)初始化ZookeeperFactory,实现newZooKeeper方法,该方法实现zookeeper的连接创建
3)初始化CuratorZookeeperClient,传入Watcher,但这个并不是传给zookeeper的,zookeeper回调时,会调到这个watcher,这个watcher会回调我们创建连接时的client.getCuratorListenable().addListener(new LcpCuratorListener(ip));这个操作。可以用于我们对事件的监控与处理(eventReceived(CuratorFramework client, CuratorEvent event))。
4)初始化ConnectionState,将3)中的watcher传给ConnectionState的parentWatchers属性,ConnectionState也是最终传给zookeeper的watcher
2.创建连接client.start();
1)连接状态管理 connectionStateManager.start()
connectionStateManager中,BlockingQueue<ConnectionState>保存着连接状态的状态变化值,
start()方法中循环获取队列中的状态值,然后执行在创建client时 client.getConnectionStateListenable().addListener(new LcpConnectionStateListener(ip));进行状态变化的监控或处理
2)连接创建 client.start();
该方法主要执行的是ConnectionState的reset()方法,reset ()主要完成老连接的关闭,和新连接的创建,此处创建连接即调用初始化ZookeeperFactory实现的newZooKeeper方法
3.zookeeper回调watcher
传入zookeeper的watcher是ConnectionState对象,则回调时,则先调用ConnectionState中的process方法,此处会判断连接状态,SyncConnected,ConnectedReadOnly则连接成功,Expired连接过期,则重新调用2中的reset()方法,其他状态则连接失败。如果连接状态有变化则通过AtomicBoolean进行保存。
此处还会调用之前初始化进去的parentWatchers,回调到初始化CuratorZookeeperClient时传入的watcher。此处,会校验连接状态,并将连接状态加入ConnectionStateManager状态管理器重进行管理(ConnectionStateManager的BlockingQueue<ConnectionState>)。加入ConnectionStateManager管理的状态会在connectionStateManager.start()中获取到,并可以通过 client.getConnectionStateListenable().addListener(new LcpConnectionStateListener(ip));来监控或处理

//创建Curator连接
public void init(){
        Assert.hasText(zkServer,"zkServer is empty");
        Assert.hasText(zkPath,"zkPath is empty");
        ip = IpUtils.getOneIpV4();
        //RetryNTimes 重试策略,
        client = CuratorFrameworkFactory.newClient(zkServer,sessionTimeoutMs,connectionTimeoutMs,new RetryNTimes(maxRetries,sleepMsBetweenRetries));
        client.getCuratorListenable().addListener(new LcpCuratorListener(ip));
        client.getConnectionStateListenable().addListener(new LcpConnectionStateListener(ip));
        client.getUnhandledErrorListenable().addListener(new LcpErrorListener(ip));
        client.start();
    }

client = CuratorFrameworkFactory.newClient(zkServer,sessionTimeoutMs,connectionTimeoutMs,new RetryNTimes(maxRetries,sleepMsBetweenRetries));

 public static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy)
    {
        return builder().
            connectString(connectString).
            sessionTimeoutMs(sessionTimeoutMs).
            connectionTimeoutMs(connectionTimeoutMs).
            retryPolicy(retryPolicy).
            build();
    }
//返回一个CuratorFrameworkImpl对象
public CuratorFramework build()
        {
            return new CuratorFrameworkImpl(this);
        }

public CuratorFrameworkImpl(CuratorFrameworkFactory.Builder builder)
    {
        //初始化ZookeeperFactory
        ZookeeperFactory localZookeeperFactory = makeZookeeperFactory(builder.getZookeeperFactory());
       初始化ConnectionState,HandleHolder
        this.client = new CuratorZookeeperClient
            (
                localZookeeperFactory,
                builder.getEnsembleProvider(),
                builder.getSessionTimeoutMs(),
                builder.getConnectionTimeoutMs(),
                builder.getWaitForShutdownTimeoutMs(),
                //这个watcher并不是真正传给zookeeper的watcher,传给zookeeper的是ConnectionState,
                //ConnectionState中重写process(WatchedEvent event)方法中,会调用这个Watcher
                new Watcher()
                {
                    @Override
                    public void process(WatchedEvent watchedEvent)
                    {
                        CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.WATCHED, watchedEvent.getState().getIntValue(), unfixForNamespace(watchedEvent.getPath()), null, null, null, null, null, watchedEvent, null, null);
                        //在这里,实现CuratorListener接口的listener重写eventReceived方法,接收zk事件信息
                        processEvent(event);
                    }
                },
                builder.getRetryPolicy(),
                builder.canBeReadOnly(),
                builder.getConnectionHandlingPolicy()
            );

       //zk连接状态的管理类, 状态发生变化时,回掉listener的
        connectionStateManager = new ConnectionStateManager(this, builder.getThreadFactory(), builder.getSessionTimeoutMs(), builder.getConnectionHandlingPolicy().getSimulatedSessionExpirationPercent(), builder.getConnectionStateListenerDecorator());
       //K节点默认值为本机IP,ZK本身是不允许创建没有value的节点的,但curator允许,就是使用了该默认值
        byte[] builderDefaultData = builder.getDefaultData();
       //省略其他变量赋值
    }

CuratorFramework.start();

 @Override
    public void start()
    {
       try
          {
            //开启 连接状态管理
            connectionStateManager.start(); 
            //CuratorZookeeperClient中的start方法,真正与ZK建立连接
            client.start();
            }
        catch ( Exception e )
        {
            ThreadUtils.checkInterrupted(e);
            handleBackgroundOperationException(null, e);
        }
    }

CuratorZookeeperClient

 public void start() throws Exception
    {
        log.debug("Starting");

        if ( !started.compareAndSet(false, true) )
        {
            throw new IllegalStateException("Already started");
        }
        //ConnectionState 的start方法
        state.start();
    }

ConnectionState类

void start() throws Exception
    {
        log.debug("Starting");
        ensembleProvider.start();
        reset();
    }

synchronized void reset() throws Exception
    {
        log.debug("reset");
        //用来记录zookeeper实例创建次数
        instanceIndex.incrementAndGet();
        isConnected.set(false);
        //连接开始时间
        connectionStartMs = System.currentTimeMillis();
        //HandleHolder  关闭老的zookeeper实例,重新构建新的helper
        zooKeeper.closeAndReset();
        //调用zookeeperFactory.newZooKeeper创建原生zookeeper连接
        zooKeeper.getZooKeeper();  
    }

HandleHolder类

void closeAndReset() throws Exception
    {
        //如果有的话关闭之前的zookeeper实例,重构HandleHolder
        internalClose(0);
        helper = new Helper()
        {
            private volatile ZooKeeper zooKeeperHandle = null;
            private volatile String connectionString = null;

            @Override
            public ZooKeeper getZooKeeper() throws Exception
            {
                synchronized(this)
                {
                    if ( zooKeeperHandle == null )
                    {
                        connectionString = ensembleProvider.getConnectionString();
                      //这里创建zookeeper连接,传入的watcher就是 ConnectionState
                        zooKeeperHandle = zookeeperFactory.newZooKeeper(connectionString, sessionTimeout, watcher, canBeReadOnly);
                    }

                    helper = new Helper()
                    {
                        @Override
                        public ZooKeeper getZooKeeper() throws Exception
                        {
                            return zooKeeperHandle;
                        }

                        @Override
                        public String getConnectionString()
                        {
                            return connectionString;
                        }

                        @Override
                        public int getNegotiatedSessionTimeoutMs()
                        {
                            return (zooKeeperHandle != null) ? zooKeeperHandle.getSessionTimeout() : 0;
                        }
                    };

                    return zooKeeperHandle;
                }
            }

            @Override
            public String getConnectionString()
            {
                return connectionString;
            }

            @Override
            public int getNegotiatedSessionTimeoutMs()
            {
                return (zooKeeperHandle != null) ? zooKeeperHandle.getSessionTimeout() : 0;
            }
        };
    }
//通过上面的helper实现知道这里真正与ZK建立连接
ZooKeeper getZooKeeper() throws Exception
    {
        return (helper != null) ? helper.getZooKeeper() : null;
    }

创建zookeeper连接之后,watcher接收zookeeper返回的连接事件并进行处理,这里的watcher就是ConnectionState类,执行其中的process方法

    @Override
    public void process(WatchedEvent event)
    {
        //这里为None说明收到的事件是ZK连接状态改变的事件
        if ( event.getType() == Watcher.Event.EventType.None )
        {
            boolean wasConnected = isConnected.get();
            boolean newIsConnected = checkState(event.getState(), wasConnected);
            //连接状态发生变化
            if ( newIsConnected != wasConnected )
            {
                isConnected.set(newIsConnected);
                //记录连接开始时间
                connectionStartMs = System.currentTimeMillis();
                //连接状态变化为已连接则记录新协商的回话超市时间
                if ( newIsConnected )
                {
                    //重置session超时时间
                    lastNegotiatedSessionTimeoutMs.set(zooKeeper.getNegotiatedSessionTimeoutMs());
                    log.debug("Negotiated session timeout: " + lastNegotiatedSessionTimeoutMs.get());
                }
            }
        }
        //回调CuratorZookeeperClient创建时的watcher,
        for ( Watcher parentWatcher : parentWatchers )
        {
            OperationTrace trace = new OperationTrace("connection-state-parent-process", tracer.get(), getSessionId());
             //回调CuratorZookeeperClient创建时的watcher
            parentWatcher.process(event);
            trace.commit();
        }
    }
// 获取当前连接状态
private boolean checkState(Event.KeeperState state, boolean wasConnected)
    {
        // AtomicBoolean isConnected = new AtomicBoolean(false); 原子boolean保存连接状态
        boolean isConnected = wasConnected;
        boolean checkNewConnectionString = true;
        switch ( state )
        {
        default:
        //连接断开
        case Disconnected:
        {
            isConnected = false;
            break;
        }
      //连接成功
        case SyncConnected:
        case ConnectedReadOnly:
        {
            isConnected = true;
            break;
        }
        //权限验证失败连接失败
        case AuthFailed:
        {
            isConnected = false;
            log.error("Authentication failed");
            break;
        }
      //连接过期
        case Expired:
        {
            isConnected = false;
            checkNewConnectionString = false;
            //处理连接过期
            //调用ConnectionState.reset() 重新构建zookeeper连接
            handleExpiredSession();
            break;
        }

        case SaslAuthenticated:
        {
            // NOP
            break;
        }
        }
         //当连接状态发生改变且不是会话过期时,检查ZK地址是否发生变化
        if ( checkNewConnectionString )
        {
            String newConnectionString = zooKeeper.getNewConnectionString();
            if ( newConnectionString != null )
            {  //处理ZK地址发生变化
                handleNewConnectionString(newConnectionString);
            }
        }
        return isConnected;
    }

parentWatcher.process(event);回调初始化CuratorZookeeperClient时传入的watcher

new Watcher()
                {
                    @Override
                    public void process(WatchedEvent watchedEvent)
                    {  //将zookeeper的event包装成CuratorEvent
                        CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.WATCHED, watchedEvent.getState().getIntValue(), unfixForNamespace(watchedEvent.getPath()), null, null, null, null, null, watchedEvent, null, null);
                        processEvent(event);
                    }
                }


private void processEvent(final CuratorEvent curatorEvent)
    {
        if ( curatorEvent.getType() == CuratorEventType.WATCHED )
        {
            //校验连接状态,并将状态加入connectionStateManager进行管理
            validateConnection(curatorEvent.getWatchedEvent().getState());
        }

        listeners.forEach(new Function<CuratorListener, Void>()
        {
            @Override
            public Void apply(CuratorListener listener)
            {
                try
                {
                    OperationTrace trace = client.startAdvancedTracer("EventListener");
                    //去回调创建client时的client.getCuratorListenable().addListener(new LcpCuratorListener(ip));
                    listener.eventReceived(CuratorFrameworkImpl.this, curatorEvent);
                    trace.commit();
                }
                catch ( Exception e )
                {
                    ThreadUtils.checkInterrupted(e);
                    logError("Event listener threw exception", e);
                }
                return null;
            }
        });
    }

void validateConnection(Watcher.Event.KeeperState state)
    {
        if ( state == Watcher.Event.KeeperState.Disconnected )
        {
            internalConnectionHandler.suspendConnection(this);
        }
        else if ( state == Watcher.Event.KeeperState.Expired )
        {  //将状态加入 阻塞队列中,在connectionStateManager.start()中循环获取该队列中的状态数据,并执行我们初始化client时的getConnectionStateListenable().addListener(new LcpConnectionStateListener(ip));
            connectionStateManager.addStateChange(ConnectionState.LOST);
        }
        else if ( state == Watcher.Event.KeeperState.SyncConnected )
        {
            internalConnectionHandler.checkNewConnection(this);
            connectionStateManager.addStateChange(ConnectionState.RECONNECTED);
            unSleepBackgroundOperations();
        }
        else if ( state == Watcher.Event.KeeperState.ConnectedReadOnly )
        {
            internalConnectionHandler.checkNewConnection(this);
            connectionStateManager.addStateChange(ConnectionState.READ_ONLY);
        }
    }


connectionStateManager.start(); 开启连接状态管理

public void start()
    {
        Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");

        service.submit
            (
                new Callable<Object>()
                {
                    @Override
                    public Object call() throws Exception
                    {
                        processEvents();
                        return null;
                    }
                }
            );
    }

 private void processEvents()
    {    //注意是个循环,一直在获取 上面那个阻塞队列中的状态值
        while ( state.get() == State.STARTED )
        {
            try
            {
                //第一次ZK还没有建立连接,这里得到的就是用户指定的会话超时时间
                int useSessionTimeoutMs = getUseSessionTimeoutMs();
                long elapsedMs = startOfSuspendedEpoch == 0 ? useSessionTimeoutMs / 2 : System.currentTimeMillis() - startOfSuspendedEpoch;
                long pollMaxMs = useSessionTimeoutMs - elapsedMs;

                //这个队列就是刚才放进去事件的队列
                final ConnectionState newState = eventQueue.poll(pollMaxMs, TimeUnit.MILLISECONDS);
                if ( newState != null )
                {
                    if ( listeners.size() == 0 )
                    {
                        log.warn("There are no ConnectionStateListeners registered.");
                    }
                     //这里仅仅就是回调监听器StandardListenerManager<ConnectionStateListener>
                    //client.getConnectionStateListenable().addListener(new ConnectionStateListener());
                    //连接状态变化
                    listeners.forEach(listener -> listener.stateChanged(client, newState));
                }
                //该值默认100,如果长时间没有收到事件变化就判断下会话是否过期
                else if ( sessionExpirationPercent > 0 )
                {
                    synchronized(this)
                    {
                        checkSessionExpiration();
                    }
                }
            }
            catch ( InterruptedException e )
            {
                // swallow the interrupt as it's only possible from either a background
                // operation and, thus, doesn't apply to this loop or the instance
                // is being closed in which case the while test will get it
            }
        }
    }

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,657评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,662评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,143评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,732评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,837评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,036评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,126评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,868评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,315评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,641评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,773评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,470评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,126评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,859评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,095评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,584评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,676评论 2 351

推荐阅读更多精彩内容