curator创建zookeeper连接
1.初始化CuratorFramework client = CuratorFrameworkFactory.newClient()
1)入参:服务器IP地址,session超时时间,连接超时时间,重试策略
2)初始化ZookeeperFactory,实现newZooKeeper方法,该方法实现zookeeper的连接创建
3)初始化CuratorZookeeperClient,传入Watcher,但这个并不是传给zookeeper的,zookeeper回调时,会调到这个watcher,这个watcher会回调我们创建连接时的client.getCuratorListenable().addListener(new LcpCuratorListener(ip));这个操作。可以用于我们对事件的监控与处理(eventReceived(CuratorFramework client, CuratorEvent event))。
4)初始化ConnectionState,将3)中的watcher传给ConnectionState的parentWatchers属性,ConnectionState也是最终传给zookeeper的watcher
2.创建连接client.start();
1)连接状态管理 connectionStateManager.start()
connectionStateManager中,BlockingQueue<ConnectionState>保存着连接状态的状态变化值,
start()方法中循环获取队列中的状态值,然后执行在创建client时 client.getConnectionStateListenable().addListener(new LcpConnectionStateListener(ip));进行状态变化的监控或处理
2)连接创建 client.start();
该方法主要执行的是ConnectionState的reset()方法,reset ()主要完成老连接的关闭,和新连接的创建,此处创建连接即调用初始化ZookeeperFactory实现的newZooKeeper方法
3.zookeeper回调watcher
传入zookeeper的watcher是ConnectionState对象,则回调时,则先调用ConnectionState中的process方法,此处会判断连接状态,SyncConnected,ConnectedReadOnly则连接成功,Expired连接过期,则重新调用2中的reset()方法,其他状态则连接失败。如果连接状态有变化则通过AtomicBoolean进行保存。
此处还会调用之前初始化进去的parentWatchers,回调到初始化CuratorZookeeperClient时传入的watcher。此处,会校验连接状态,并将连接状态加入ConnectionStateManager状态管理器重进行管理(ConnectionStateManager的BlockingQueue<ConnectionState>)。加入ConnectionStateManager管理的状态会在connectionStateManager.start()中获取到,并可以通过 client.getConnectionStateListenable().addListener(new LcpConnectionStateListener(ip));来监控或处理
//创建Curator连接
public void init(){
Assert.hasText(zkServer,"zkServer is empty");
Assert.hasText(zkPath,"zkPath is empty");
ip = IpUtils.getOneIpV4();
//RetryNTimes 重试策略,
client = CuratorFrameworkFactory.newClient(zkServer,sessionTimeoutMs,connectionTimeoutMs,new RetryNTimes(maxRetries,sleepMsBetweenRetries));
client.getCuratorListenable().addListener(new LcpCuratorListener(ip));
client.getConnectionStateListenable().addListener(new LcpConnectionStateListener(ip));
client.getUnhandledErrorListenable().addListener(new LcpErrorListener(ip));
client.start();
}
client = CuratorFrameworkFactory.newClient(zkServer,sessionTimeoutMs,connectionTimeoutMs,new RetryNTimes(maxRetries,sleepMsBetweenRetries));
public static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy)
{
return builder().
connectString(connectString).
sessionTimeoutMs(sessionTimeoutMs).
connectionTimeoutMs(connectionTimeoutMs).
retryPolicy(retryPolicy).
build();
}
//返回一个CuratorFrameworkImpl对象
public CuratorFramework build()
{
return new CuratorFrameworkImpl(this);
}
public CuratorFrameworkImpl(CuratorFrameworkFactory.Builder builder)
{
//初始化ZookeeperFactory
ZookeeperFactory localZookeeperFactory = makeZookeeperFactory(builder.getZookeeperFactory());
初始化ConnectionState,HandleHolder
this.client = new CuratorZookeeperClient
(
localZookeeperFactory,
builder.getEnsembleProvider(),
builder.getSessionTimeoutMs(),
builder.getConnectionTimeoutMs(),
builder.getWaitForShutdownTimeoutMs(),
//这个watcher并不是真正传给zookeeper的watcher,传给zookeeper的是ConnectionState,
//ConnectionState中重写process(WatchedEvent event)方法中,会调用这个Watcher
new Watcher()
{
@Override
public void process(WatchedEvent watchedEvent)
{
CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.WATCHED, watchedEvent.getState().getIntValue(), unfixForNamespace(watchedEvent.getPath()), null, null, null, null, null, watchedEvent, null, null);
//在这里,实现CuratorListener接口的listener重写eventReceived方法,接收zk事件信息
processEvent(event);
}
},
builder.getRetryPolicy(),
builder.canBeReadOnly(),
builder.getConnectionHandlingPolicy()
);
//zk连接状态的管理类, 状态发生变化时,回掉listener的
connectionStateManager = new ConnectionStateManager(this, builder.getThreadFactory(), builder.getSessionTimeoutMs(), builder.getConnectionHandlingPolicy().getSimulatedSessionExpirationPercent(), builder.getConnectionStateListenerDecorator());
//K节点默认值为本机IP,ZK本身是不允许创建没有value的节点的,但curator允许,就是使用了该默认值
byte[] builderDefaultData = builder.getDefaultData();
//省略其他变量赋值
}
CuratorFramework.start();
@Override
public void start()
{
try
{
//开启 连接状态管理
connectionStateManager.start();
//CuratorZookeeperClient中的start方法,真正与ZK建立连接
client.start();
}
catch ( Exception e )
{
ThreadUtils.checkInterrupted(e);
handleBackgroundOperationException(null, e);
}
}
CuratorZookeeperClient
public void start() throws Exception
{
log.debug("Starting");
if ( !started.compareAndSet(false, true) )
{
throw new IllegalStateException("Already started");
}
//ConnectionState 的start方法
state.start();
}
ConnectionState类
void start() throws Exception
{
log.debug("Starting");
ensembleProvider.start();
reset();
}
synchronized void reset() throws Exception
{
log.debug("reset");
//用来记录zookeeper实例创建次数
instanceIndex.incrementAndGet();
isConnected.set(false);
//连接开始时间
connectionStartMs = System.currentTimeMillis();
//HandleHolder 关闭老的zookeeper实例,重新构建新的helper
zooKeeper.closeAndReset();
//调用zookeeperFactory.newZooKeeper创建原生zookeeper连接
zooKeeper.getZooKeeper();
}
HandleHolder类
void closeAndReset() throws Exception
{
//如果有的话关闭之前的zookeeper实例,重构HandleHolder
internalClose(0);
helper = new Helper()
{
private volatile ZooKeeper zooKeeperHandle = null;
private volatile String connectionString = null;
@Override
public ZooKeeper getZooKeeper() throws Exception
{
synchronized(this)
{
if ( zooKeeperHandle == null )
{
connectionString = ensembleProvider.getConnectionString();
//这里创建zookeeper连接,传入的watcher就是 ConnectionState
zooKeeperHandle = zookeeperFactory.newZooKeeper(connectionString, sessionTimeout, watcher, canBeReadOnly);
}
helper = new Helper()
{
@Override
public ZooKeeper getZooKeeper() throws Exception
{
return zooKeeperHandle;
}
@Override
public String getConnectionString()
{
return connectionString;
}
@Override
public int getNegotiatedSessionTimeoutMs()
{
return (zooKeeperHandle != null) ? zooKeeperHandle.getSessionTimeout() : 0;
}
};
return zooKeeperHandle;
}
}
@Override
public String getConnectionString()
{
return connectionString;
}
@Override
public int getNegotiatedSessionTimeoutMs()
{
return (zooKeeperHandle != null) ? zooKeeperHandle.getSessionTimeout() : 0;
}
};
}
//通过上面的helper实现知道这里真正与ZK建立连接
ZooKeeper getZooKeeper() throws Exception
{
return (helper != null) ? helper.getZooKeeper() : null;
}
创建zookeeper连接之后,watcher接收zookeeper返回的连接事件并进行处理,这里的watcher就是ConnectionState类,执行其中的process方法
@Override
public void process(WatchedEvent event)
{
//这里为None说明收到的事件是ZK连接状态改变的事件
if ( event.getType() == Watcher.Event.EventType.None )
{
boolean wasConnected = isConnected.get();
boolean newIsConnected = checkState(event.getState(), wasConnected);
//连接状态发生变化
if ( newIsConnected != wasConnected )
{
isConnected.set(newIsConnected);
//记录连接开始时间
connectionStartMs = System.currentTimeMillis();
//连接状态变化为已连接则记录新协商的回话超市时间
if ( newIsConnected )
{
//重置session超时时间
lastNegotiatedSessionTimeoutMs.set(zooKeeper.getNegotiatedSessionTimeoutMs());
log.debug("Negotiated session timeout: " + lastNegotiatedSessionTimeoutMs.get());
}
}
}
//回调CuratorZookeeperClient创建时的watcher,
for ( Watcher parentWatcher : parentWatchers )
{
OperationTrace trace = new OperationTrace("connection-state-parent-process", tracer.get(), getSessionId());
//回调CuratorZookeeperClient创建时的watcher
parentWatcher.process(event);
trace.commit();
}
}
// 获取当前连接状态
private boolean checkState(Event.KeeperState state, boolean wasConnected)
{
// AtomicBoolean isConnected = new AtomicBoolean(false); 原子boolean保存连接状态
boolean isConnected = wasConnected;
boolean checkNewConnectionString = true;
switch ( state )
{
default:
//连接断开
case Disconnected:
{
isConnected = false;
break;
}
//连接成功
case SyncConnected:
case ConnectedReadOnly:
{
isConnected = true;
break;
}
//权限验证失败连接失败
case AuthFailed:
{
isConnected = false;
log.error("Authentication failed");
break;
}
//连接过期
case Expired:
{
isConnected = false;
checkNewConnectionString = false;
//处理连接过期
//调用ConnectionState.reset() 重新构建zookeeper连接
handleExpiredSession();
break;
}
case SaslAuthenticated:
{
// NOP
break;
}
}
//当连接状态发生改变且不是会话过期时,检查ZK地址是否发生变化
if ( checkNewConnectionString )
{
String newConnectionString = zooKeeper.getNewConnectionString();
if ( newConnectionString != null )
{ //处理ZK地址发生变化
handleNewConnectionString(newConnectionString);
}
}
return isConnected;
}
parentWatcher.process(event);回调初始化CuratorZookeeperClient时传入的watcher
new Watcher()
{
@Override
public void process(WatchedEvent watchedEvent)
{ //将zookeeper的event包装成CuratorEvent
CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.WATCHED, watchedEvent.getState().getIntValue(), unfixForNamespace(watchedEvent.getPath()), null, null, null, null, null, watchedEvent, null, null);
processEvent(event);
}
}
private void processEvent(final CuratorEvent curatorEvent)
{
if ( curatorEvent.getType() == CuratorEventType.WATCHED )
{
//校验连接状态,并将状态加入connectionStateManager进行管理
validateConnection(curatorEvent.getWatchedEvent().getState());
}
listeners.forEach(new Function<CuratorListener, Void>()
{
@Override
public Void apply(CuratorListener listener)
{
try
{
OperationTrace trace = client.startAdvancedTracer("EventListener");
//去回调创建client时的client.getCuratorListenable().addListener(new LcpCuratorListener(ip));
listener.eventReceived(CuratorFrameworkImpl.this, curatorEvent);
trace.commit();
}
catch ( Exception e )
{
ThreadUtils.checkInterrupted(e);
logError("Event listener threw exception", e);
}
return null;
}
});
}
void validateConnection(Watcher.Event.KeeperState state)
{
if ( state == Watcher.Event.KeeperState.Disconnected )
{
internalConnectionHandler.suspendConnection(this);
}
else if ( state == Watcher.Event.KeeperState.Expired )
{ //将状态加入 阻塞队列中,在connectionStateManager.start()中循环获取该队列中的状态数据,并执行我们初始化client时的getConnectionStateListenable().addListener(new LcpConnectionStateListener(ip));
connectionStateManager.addStateChange(ConnectionState.LOST);
}
else if ( state == Watcher.Event.KeeperState.SyncConnected )
{
internalConnectionHandler.checkNewConnection(this);
connectionStateManager.addStateChange(ConnectionState.RECONNECTED);
unSleepBackgroundOperations();
}
else if ( state == Watcher.Event.KeeperState.ConnectedReadOnly )
{
internalConnectionHandler.checkNewConnection(this);
connectionStateManager.addStateChange(ConnectionState.READ_ONLY);
}
}
connectionStateManager.start(); 开启连接状态管理
public void start()
{
Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
service.submit
(
new Callable<Object>()
{
@Override
public Object call() throws Exception
{
processEvents();
return null;
}
}
);
}
private void processEvents()
{ //注意是个循环,一直在获取 上面那个阻塞队列中的状态值
while ( state.get() == State.STARTED )
{
try
{
//第一次ZK还没有建立连接,这里得到的就是用户指定的会话超时时间
int useSessionTimeoutMs = getUseSessionTimeoutMs();
long elapsedMs = startOfSuspendedEpoch == 0 ? useSessionTimeoutMs / 2 : System.currentTimeMillis() - startOfSuspendedEpoch;
long pollMaxMs = useSessionTimeoutMs - elapsedMs;
//这个队列就是刚才放进去事件的队列
final ConnectionState newState = eventQueue.poll(pollMaxMs, TimeUnit.MILLISECONDS);
if ( newState != null )
{
if ( listeners.size() == 0 )
{
log.warn("There are no ConnectionStateListeners registered.");
}
//这里仅仅就是回调监听器StandardListenerManager<ConnectionStateListener>
//client.getConnectionStateListenable().addListener(new ConnectionStateListener());
//连接状态变化
listeners.forEach(listener -> listener.stateChanged(client, newState));
}
//该值默认100,如果长时间没有收到事件变化就判断下会话是否过期
else if ( sessionExpirationPercent > 0 )
{
synchronized(this)
{
checkSessionExpiration();
}
}
}
catch ( InterruptedException e )
{
// swallow the interrupt as it's only possible from either a background
// operation and, thus, doesn't apply to this loop or the instance
// is being closed in which case the while test will get it
}
}
}