最近在看canal源码,有一些疑问。比如canal的HA模式是怎么实现的,mysql dump的位点又是怎么确定的,canal客户端是如何获取数据和ack的,又是如何实现mysql主备切换的等等,针对这些疑问我将输出几篇源码分析,欢迎指正交流。本文是关于canal 服务端和客户端的HA实现源码分析。在此之前,建议大家对canal的整体架构有所了解,可参考官方文档。首先看下官方文档中对HA机制的描述。
HA机制设计
canal的ha分为两部分,canal server和canal client分别有对应的ha实现。
canal server: 为了减少对mysql dump的请求,不同server上的instance要求同一时间只能有一个处于running,其他的处于standby状态.
canal client: 为了保证有序性,一份instance同一时间只能由一个canal client进行get/ack/rollback操作,否则客户端接收无法保证有序。
整个HA机制的控制主要是依赖了zookeeper的几个特性,watcher和EPHEMERAL节点(和session生命周期绑定)。
- canal server要启动某个canal instance时都先向zookeeper进行一次尝试启动判断 (实现:创建EPHEMERAL节点,谁创建成功就允许谁启动)
- 创建zookeeper节点成功后,对应的canal server就启动对应的canal instance,没有创建成功的canal instance就会处于standby状态
- 一旦zookeeper发现canal server A创建的节点消失后,立即通知其他的canal server再次进行步骤1的操作,重新选出一个canal server启动instance.
- canal client每次进行connect时,会首先向zookeeper询问当前是谁启动了canal instance,然后和其建立链接,一旦链接不可用,会重新尝试connect.
Canal Client的方式和canal server方式类似,也是利用zookeeper的抢占EPHEMERAL节点的方式进行控制.下文将开始分析ha的源码实现。
Canal Server HA实现
首先看下canal如何开启HA模式:
canal.properties中加入以下配置:
//指定注册的zk地址
canal.zkServers =127.0.0.1:2181
//此配置会使用PeriodMixedMetaManager管理位点,会把ack位点注册到zk节点上,当failover时可从ack位点处重新消费
canal.instance.global.spring.xml = classpath:spring/default-instance.xml
然后我们在看canal-server启动是如何和zookeeper交互的,这部分主要是canal server端的HA实现
。
启动入口为CanalLauncher的main方法,canal.serverMode = tcp
模式下的实际启动类为CanalController。
初始化CanalController时,会初始化canal在zookeeper上的节点系统目录。
public CanalController(final Properties properties){
······························省略与HA无关代码····································
final String zkServers = getProperty(properties, CanalConstants.CANAL_ZKSERVERS);
if (StringUtils.isNotEmpty(zkServers)) {
zkclientx = ZkClientx.getZkClient(zkServers);
// 如果不存在以下目录,则初始化系统目录
// /otter/canal/destinations:用于存放instance信息
zkclientx.createPersistent(ZookeeperPathUtils.DESTINATION_ROOT_NODE, true);
// /otter/canal/cluster:用于存放canal-server节点信息
zkclientx.createPersistent(ZookeeperPathUtils.CANAL_CLUSTER_ROOT_NODE, true);
}
······························省略与HA无关代码····································
}
启动CanalController时,
public void start() throws Throwable {
logger.info("## start the canal server[{}:{}]", ip, port);
// 创建整个canal的工作节点
final String path = ZookeeperPathUtils.getCanalClusterNode(ip + ":" + port);
//1.
initCid(path);
if (zkclientx != null) {
//2.
this.zkclientx.subscribeStateChanges(new IZkStateListener() {
public void handleStateChanged(KeeperState state) throws Exception {
}
public void handleNewSession() throws Exception {
initCid(path);
}
@Override
public void handleSessionEstablishmentError(Throwable error) throws Exception {
logger.error("failed to connect to zookeeper", error);
}
});
}
// 优先启动embeded服务
embededCanalServer.start();
// 尝试启动一下非lazy状态的通道
for (Map.Entry<String, InstanceConfig> entry : instanceConfigs.entrySet()) {
final String destination = entry.getKey();
InstanceConfig config = entry.getValue();
// 创建destination的工作节点
if (!embededCanalServer.isStart(destination)) {
// 3.HA机制启动
ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination);
if (!config.getLazy() && !runningMonitor.isStart()) {
runningMonitor.start();
}
}
// 启动网络接口
if (canalServer != null) {
canalServer.start();
}
}
1.会在/otter/canal/cluster节点下创建"ip:port"临时节点,如/otter/canal/cluster/10.33.200.132:11111
2.注册IZkStateListener,用来监听和zk的连接状态变化,这样当会话过期后重新建立新会话时再次创建"ip:port"临时节点。
3.HA机制启动。对于每一个instance,都会在/otter/canal/destinations节点下记录自己的canal-server和canal-client信息。每个canal-server对每个instance的管理是交给ServerRunningMonitor类的。
3.1.ServerRunningMonitor的初始化,调用ServerRunningMonitors.getRunningMonitor(destination)
时如果为null,便会调用ServerRunningMonitor.apply
方法。
final ServerRunningData serverData = new ServerRunningData(cid, ip + ":" + port);
//设置canal-server信息
ServerRunningMonitors.setServerData(serverData);
ServerRunningMonitors
.setRunningMonitors(MigrateMap.makeComputingMap(new Function<String, ServerRunningMonitor>() {
public ServerRunningMonitor apply(final String destination) {
ServerRunningMonitor runningMonitor = new ServerRunningMonitor(serverData);
//设置intance的名字
runningMonitor.setDestination(destination);
runningMonitor.setListener(new ServerRunningListener() {
public void processActiveEnter() {
try {
MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));
//启动intance
embededCanalServer.start(destination);
if (canalMQStarter != null) {
canalMQStarter.startDestination(destination);
}
} finally {
MDC.remove(CanalConstants.MDC_DESTINATION);
}
}
public void processActiveExit() {
try {
MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));
if (canalMQStarter != null) {
canalMQStarter.stopDestination(destination);
}
embededCanalServer.stop(destination);
} finally {
MDC.remove(CanalConstants.MDC_DESTINATION);
}
}
public void processStart() {
try {
if (zkclientx != null) {
final String path = ZookeeperPathUtils.getDestinationClusterNode(destination,
ip + ":" + port);
initCid(path);
zkclientx.subscribeStateChanges(new IZkStateListener() {
public void handleStateChanged(KeeperState state) throws Exception {
}
public void handleNewSession() throws Exception {
initCid(path);
}
@Override
public void handleSessionEstablishmentError(Throwable error) throws Exception {
logger.error("failed to connect to zookeeper", error);
}
});
}
} finally {
MDC.remove(CanalConstants.MDC_DESTINATION);
}
}
public void processStop() {
try {
MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));
if (zkclientx != null) {
final String path = ZookeeperPathUtils.getDestinationClusterNode(destination,
ip + ":" + port);
releaseCid(path);
}
} finally {
MDC.remove(CanalConstants.MDC_DESTINATION);
}
}
});
if (zkclientx != null) {
runningMonitor.setZkClient(zkclientx);
}
// 触发创建一下cid节点
runningMonitor.init();
return runningMonitor;
}
}));
3.2 ServerRunningMonitor的启动
public synchronized void start() {
super.start();
try {
//3.2.1
processStart();
if (zkClient != null) {
//3.2.2 如果需要尽可能释放instance资源,不需要监听running节点,不然即使stop了这台机器,另一台机器立马会start
// /otter/canal/destinations/{destination}/running 节点
String path = ZookeeperPathUtils.getDestinationServerRunning(destination);
zkClient.subscribeDataChanges(path, dataListener);
initRunning();
} else {
processActiveEnter();// 没有zk,直接启动
}
} catch (Exception e) {
logger.error("start failed", e);
// 没有正常启动,重置一下状态,避免干扰下一次start
stop();
}
}
3.2.1 调用processStart
方法,这里会在/otter/canal/destinations/{destination}/cluster 节点下注册IZkStateListener,用来监听和zk的连接状态变化,同时创建"ip:port"临时节点。这个临时节点主要是用来给canal-client提供可用canal-server节点列表使用。
3.2.2 在/otter/canal/destinations/{destination}/running 节点下注册dataListener,用来监听该节点的数据增删改变化。
3.3.3初始化instance下的canal-server信息。
private void initRunning() {
if (!isStart()) {
return;
}
String path = ZookeeperPathUtils.getDestinationServerRunning(destination);
// 序列化
byte[] bytes = JsonUtils.marshalToByte(serverData);
try {
mutex.set(false);
zkClient.create(path, bytes, CreateMode.EPHEMERAL);
activeData = serverData;
processActiveEnter();// 触发一下事件
mutex.set(true);
} catch (ZkNodeExistsException e) {
bytes = zkClient.readData(path, true);
if (bytes == null) {// 如果不存在节点,立即尝试一次
initRunning();
} else {
activeData = JsonUtils.unmarshalFromByte(bytes, ServerRunningData.class);
}
} catch (ZkNoNodeException e) {
zkClient.createPersistent(ZookeeperPathUtils.getDestinationPath(destination), true); // 尝试创建父节点
initRunning();
}
}
创建/otter/canal/destinations/{destination}/running 临时节点,不能创建成功会抛出ZkNodeExistsException异常,表明这个instance已经有其他canal-server负责binlog同步,此时会读取该临时节点数据,记录下来为其服务的canal-server节点数据到activeData中。
能创建成功表明这个instance由当前canal-server负责binlog同步,调用processActiveEnter
方法启动这个instance。
所以/otter/canal/destinations/{destination}/running 临时节点 表示当前为该instance服务的canal-server节点是谁。如果canal-server与zk连接超时,会导致该临时节点被删除。此时每个canal-server注册在该节点上的dataListener便会监听到这一变化,做主备切换之类的操作。
//初始化ServerRunningMonitor时会初始化dataListener
public ServerRunningMonitor(){
// 创建父节点
dataListener = new IZkDataListener() {
public void handleDataChange(String dataPath, Object data) throws Exception {
MDC.put("destination", destination);
ServerRunningData runningData = JsonUtils.unmarshalFromByte((byte[]) data, ServerRunningData.class);
if (!isMine(runningData.getAddress())) {
mutex.set(false);
}
if (!runningData.isActive() && isMine(runningData.getAddress())) { // 说明出现了主动释放的操作,并且本机之前是active
release = true;
releaseRunning();// 彻底释放mainstem
}
activeData = (ServerRunningData) runningData;
}
public void handleDataDeleted(String dataPath) throws Exception {
MDC.put("destination", destination);
mutex.set(false);
if (!release && activeData != null && isMine(activeData.getAddress())) {
// 如果上一次active的状态就是本机,则即时触发一下active抢占
initRunning();
} else {
// 否则就是等待delayTime,避免因网络瞬端或者zk异常,导致出现频繁的切换操作
delayExector.schedule(new Runnable() {
public void run() {
initRunning();
}
}, delayTime, TimeUnit.SECONDS);
}
}
};
}
可以看到,监听了/otter/canal/destinations/{destination}/running 临时节点的节点删除handleDataDeleted
和节点数据变化handleDataChange
两个事件。
当节点被删除时,如果上一次active的状态就是本机,则调用initRunning
即时触发一下active抢占。否则就是等待delayTime之后在抢占,避免因网络瞬端或者zk异常,导致出现频繁的切换操作。
当节点修改时,主要是记录下此时/otter/canal/destinations/{destination}/running 临时节点下激活的canal-server是谁到activeData中。如果出现了主动释放,则彻底释放instance。(删除zk上该临时节点,关闭instance,没有在代码中看到release=false的情况)。
总结一下
:/otter/canal/cluster节点下的临时子节点代表当前有多少个正常运行的canal-server。/otter/canal/destinations/{destination}/cluster代表当前instance下有多少可用的canal server。/otter/canal/destinations/{destination}/running 临时节点下的数据代表当前instance的激活canal-server是谁,每个正常运行的canal-server都会在/otter/canal/destinations/{destination}/running 临时节点下注册dataListener,用于及时做HA切换。
Canal Client HA实现
canal中一个instance只能由一个 client消费,接下来看一下canal-client的HA是如何通过zk实现的。官方给出了client test类ClusterCanalClientTest
,核心代码就是完成了官方文档中增量订阅和消费过程:
// 基于zookeeper动态获取canal server的地址,建立链接,其中一台server发生crash,可以支持failover
CanalConnector connector = CanalConnectors.newClusterConnector("127.0.0.1:2181", destination, "", "");
protected void process() {
int batchSize = 5 * 1024;
while (running) {
try {
MDC.put("destination", destination);
//1.建立连接
connector.connect();
//2.客户端订阅,不提交客户端filter,以服务端的filter为准
connector.subscribe();
while (running) {
//3. 不指定 position 获取事件,该方法返回的条件: 尝试拿batchSize条记录,有多少取多少,不会阻塞等待
// canal 会记住此 client 最新的position。
// 如果是第一次 fetch,则会从 canal 中保存的最老一条数据开始输出。
Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
// try {
// Thread.sleep(1000);
// } catch (InterruptedException e) {
// }
} else {
printSummary(message, batchId, size);
printEntry(message.getEntries());
}
connector.ack(batchId); // 提交确认
// connector.rollback(batchId); // 处理失败, 回滚数据
}
} catch (Exception e) {
logger.error("process error!", e);
} finally {
connector.disconnect();
MDC.remove("destination");
}
}
}
1.在canal client和canal server建立连接时,会从zookeeper上获取cluster和running节点下信息,并通过ClusterNodeAccessStrategy
来注册listeners实时感知这些数据的变化,从而支持连接重试的failover。
public void connect() throws CanalClientException {
while (currentConnector == null) {
int times = 0;
while (true) {
try {
currentConnector = new SimpleCanalConnector(null, username, password, destination) {
@Override
public SocketAddress getNextAddress() {
return accessStrategy.nextNode();
}
};
currentConnector.setSoTimeout(soTimeout);
currentConnector.setIdleTimeout(idleTimeout);
if (filter != null) {
currentConnector.setFilter(filter);
}
if (accessStrategy instanceof ClusterNodeAccessStrategy) {
//1.1
currentConnector.setZkClientx(((ClusterNodeAccessStrategy) accessStrategy).getZkClient());
}
currentConnector.connect();
break;
} catch (Exception e) {
logger.warn("failed to connect to:{} after retry {} times", accessStrategy.currentNode(), times);
currentConnector.disconnect();
currentConnector = null;
// retry for #retryTimes for each node when trying to
// connect to it.
times = times + 1;
if (times >= retryTimes) {
throw new CanalClientException(e);
} else {
// fixed issue #55,增加sleep控制,避免重试connect时cpu使用过高
try {
Thread.sleep(retryInterval);
} catch (InterruptedException e1) {
throw new CanalClientException(e1);
}
}
}
}
}
}
1.1 设置当前connector的zkClient,并在zk上初始化客户端信息。
public void setZkClientx(ZkClientx zkClientx) {
this.zkClientx = zkClientx;
initClientRunningMonitor(this.clientIdentity);
}
private synchronized void initClientRunningMonitor(ClientIdentity clientIdentity) {
if (zkClientx != null && clientIdentity != null && runningMonitor == null) {
ClientRunningData clientData = new ClientRunningData();
clientData.setClientId(clientIdentity.getClientId());
clientData.setAddress(AddressUtils.getHostIp());
runningMonitor = new ClientRunningMonitor();
runningMonitor.setDestination(clientIdentity.getDestination());
runningMonitor.setZkClient(zkClientx);
runningMonitor.setClientData(clientData);
runningMonitor.setListener(new ClientRunningListener() {
public InetSocketAddress processActiveEnter() {
InetSocketAddress address = doConnect();
mutex.set(true);
if (filter != null) { // 如果存在条件,说明是自动切换,基于上一次的条件订阅一次
subscribe(filter);
}
if (rollbackOnConnect) {
rollback();
}
return address;
}
public void processActiveExit() {
mutex.set(false);
doDisconnect();
}
});
}
}
ClientRunningMonitor 类似于ServerRunningMonitor,是客户端对instance的管理。在真正建立连接currentConnector.connect();
时,会启动ClientRunningMonitor
public void connect() throws CanalClientException {
if (connected) {
return;
}
if (runningMonitor != null) {
if (!runningMonitor.isStart()) {
//启动ClientRunningMonitor
runningMonitor.start();
}
} else {
waitClientRunning();
if (!running) {
return;
}
//从ClusterNodeAccessStrategy中选择当前instance正在工作的canal server进行连接
doConnect();
if (filter != null) { // 如果存在条件,说明是自动切换,基于上一次的条件订阅一次
subscribe(filter);
}
if (rollbackOnConnect) {
rollback();
}
}
connected = true;
}
可以看到与canal server建立连接前会启动ClientRunningMonitor,获取消费instance 的权利。
//ClientRunningMonitor.start
public void start() {
super.start();
String path = ZookeeperPathUtils.getDestinationClientRunning(this.destination, clientData.getClientId());
zkClient.subscribeDataChanges(path, dataListener);
initRunning();
}
与serverRunningMonitor 原理一样,在代表instance客户端的running节点/otter/canal/destinations/{destination}/{clientId}/running
下注册dataListener,由dataListener监听节点数据变化负责客户端的HA切换。
public ClientRunningMonitor(){
dataListener = new IZkDataListener() {
public void handleDataChange(String dataPath, Object data) throws Exception {
MDC.put("destination", destination);
ClientRunningData runningData = JsonUtils.unmarshalFromByte((byte[]) data, ClientRunningData.class);
if (!isMine(runningData.getAddress())) {
mutex.set(false);
}
if (!runningData.isActive() && isMine(runningData.getAddress())) { // 说明出现了主动释放的操作,并且本机之前是active
release = true;
releaseRunning();// 彻底释放mainstem
}
activeData = (ClientRunningData) runningData;
}
public void handleDataDeleted(String dataPath) throws Exception {
MDC.put("destination", destination);
mutex.set(false);
// 触发一下退出,可能是人为干预的释放操作或者网络闪断引起的session expired timeout
processActiveExit();
if (!release && activeData != null && isMine(activeData.getAddress())) {
// 如果上一次active的状态就是本机,则即时触发一下active抢占
initRunning();
} else {
// 否则就是等待delayTime,避免因网络瞬端或者zk异常,导致出现频繁的切换操作
delayExector.schedule(new Runnable() {
public void run() {
initRunning();
}
}, delayTime, TimeUnit.SECONDS);
}
}
};
}
这个临时节点的创建过程在ClientRunningMonitor.initRunning
中,创建临时节点成功才能与canal server建立连接。临时节点写入信息为客户端的IP,port和clientId信息。
// 1,在方法上加synchronized关键字,保证同步顺序执行;
// 2,判断Zk上已经存在的activeData是否是本机,是的话把mutex重置为true,否则会导致死锁
// 3,增加异常处理,保证出现异常时,running节点能被删除,否则会导致死锁
public synchronized void initRunning() {
if (!isStart()) {
return;
}
String path = ZookeeperPathUtils.getDestinationClientRunning(this.destination, clientData.getClientId());
// 序列化
byte[] bytes = JsonUtils.marshalToByte(clientData);
try {
mutex.set(false);
zkClient.create(path, bytes, CreateMode.EPHEMERAL);
processActiveEnter();// 触发一下事件,建立和canal server的连接
activeData = clientData;
mutex.set(true);
} catch (ZkNodeExistsException e) {
bytes = zkClient.readData(path, true);
if (bytes == null) {// 如果不存在节点,立即尝试一次
initRunning();
} else {
activeData = JsonUtils.unmarshalFromByte(bytes, ClientRunningData.class);
// 如果发现已经存在,判断一下是否自己,避免活锁
if (activeData.getAddress().contains(":") && isMine(activeData.getAddress())) {
mutex.set(true);
}
}
} catch (ZkNoNodeException e) {
zkClient.createPersistent(ZookeeperPathUtils.getClientIdNodePath(this.destination, clientData.getClientId()),
true); // 尝试创建父节点
initRunning();
} catch (Throwable t) {
logger.error(MessageFormat.format("There is an error when execute initRunning method, with destination [{0}].",
destination),
t);
// fixed issue 1220, 针对server节点不工作避免死循环
if (t instanceof ServerNotFoundException) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
}
// 出现任何异常尝试release
releaseRunning();
throw new CanalClientException("something goes wrong in initRunning method. ", t);
}
}
- 客户端与服务端建立连接后,会发送SUBSCRIPTION 请求给服务端。subscribe主要是告诉canal server需要按照什么过滤条件来过滤库中的binlog信息,同时将当前clientIdentity告诉服务端。因为一个instance只能对应一个client,所以clientIdentity统一初始化为:
this.clientIdentity = new ClientIdentity(destination, (short) 1001);
public ClientIdentity(String destination, short clientId){
this.clientId = clientId;
this.destination = destination;
}
public void subscribe(String filter) throws CanalClientException {
waitClientRunning();
if (!running) {
return;
}
try {
writeWithHeader(Packet.newBuilder()
.setType(PacketType.SUBSCRIPTION)
.setBody(Sub.newBuilder()
.setDestination(clientIdentity.getDestination())
.setClientId(String.valueOf(clientIdentity.getClientId()))
.setFilter(filter != null ? filter : "")
.build()
.toByteString())
.build()
.toByteArray());
//
Packet p = Packet.parseFrom(readNextPacket());
Ack ack = Ack.parseFrom(p.getBody());
if (ack.getErrorCode() > 0) {
throw new CanalClientException("failed to subscribe with reason: " + ack.getErrorMessage());
}
clientIdentity.setFilter(filter);
} catch (IOException e) {
throw new CanalClientException(e);
}
}
当canal server接收到SUBSCRIPTION请求时,会将客户端信息clientIdentity注册到对应instance
//embeddedServer.subscribe(clientIdentity)
/**
* 客户端订阅,重复订阅时会更新对应的filter信息
*/
@Override
public void subscribe(ClientIdentity clientIdentity) throws CanalServerException {
checkStart(clientIdentity.getDestination());
CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination());
if (!canalInstance.getMetaManager().isStart()) {
canalInstance.getMetaManager().start();
}
//1.
canalInstance.getMetaManager().subscribe(clientIdentity); // 执行一下meta订阅
Position position = canalInstance.getMetaManager().getCursor(clientIdentity);
if (position == null) {
position = canalInstance.getEventStore().getFirstPosition();// 获取一下store中的第一条
if (position != null) {
//2.
canalInstance.getMetaManager().updateCursor(clientIdentity, position); // 更新一下cursor
}
logger.info("subscribe successfully, {} with first position:{} ", clientIdentity, position);
} else {
logger.info("subscribe successfully, use last cursor position:{} ", clientIdentity, position);
}
// 通知下订阅关系变化
canalInstance.subscribeChange(clientIdentity);
}
主要逻辑为:
- 执行一下meta订阅。metaManager是负责管理客户端消费位点等信息,对于HA模式下,客户端位点,filter等信息会放到zk上,方便canal server切换时的共用。
//ZooKeeperMetaManager.subscribe
public void subscribe(ClientIdentity clientIdentity) throws CanalMetaManagerException {
String path = ZookeeperPathUtils.getClientIdNodePath(clientIdentity.getDestination(),
clientIdentity.getClientId());
try {
zkClientx.createPersistent(path, true);
} catch (ZkNodeExistsException e) {
// ignore
}
//如果客户端存在filter,则创建/otter/canal/destinations/{destination}/{clientId}/filter持久节点,存放客户端的filter信息。
if (clientIdentity.hasFilter()) {
String filterPath = ZookeeperPathUtils.getFilterPath(clientIdentity.getDestination(),
clientIdentity.getClientId());
byte[] bytes = null;
try {
bytes = clientIdentity.getFilter().getBytes(ENCODE);
} catch (UnsupportedEncodingException e) {
throw new CanalMetaManagerException(e);
}
try {
zkClientx.createPersistent(filterPath, bytes);
} catch (ZkNodeExistsException e) {
// ignore
zkClientx.writeData(filterPath, bytes);
}
}
}
如果客户端存在filter,则创建/otter/canal/destinations/{destination}/{clientId}/filter持久节点,存放客户端的filter信息。
2.如果canal server存在原来的位点信息,则通过后台定时任务将位点信息刷新到/otter/canal/destinations/{destination}/{clientId}/cursor持久节点中。
总结一下
:canal client的HA模式同样是有临时节点和节点listener watch保证。当canal client与canal server建立连接前,会创建临时节点/otter/canal/destinations/{destination}/{clientId}/running,创建成功的client在该节点下写入自己的IP,port,clientId信息,表示当前该instance下激活的client是自己。同时每个canal client都会在节点上注册dataListener,监听节点数据变化负责客户端的HA切换。激活的client会通过ClusterNodeAccessStrategy获得zk上canal server的信息,得知当前instance下的激活canal server,并与之建立连接。之后客户端发送SUBSCRIPTION 请求给canal server,如果存在客户端位点,filter等信息,会注册到zk上,方便canal server切换时的共用。
至此canal的HA模式分析完毕,下篇文章将分析canal工作过程中的binlog位点是如何确定的。