canal.client 源码分析

一、整体设计

1.1 功能概述

封装与 Canal Server 进行交互的客户端,提供两种实现给外部使用:

  • 简单连接:直接通过 socket 与 server 进行交互,实现连接、订阅、批量获取、提交和回滚等操作。
  • 有 HA 的 Cluster 连接:基于简单连接方式进行封装,通过 ZooKeeper 实现 client 端的 HA。

1.2 目录结构

项目基本结构.png

其中 kafka 包和 rocketmq 包实现了接收到的消息直接发送到消息队列中,目前先重点关注与服务端的交互和消息的处理流程,后续会重点关注 kafka 包的实现,为目前生产上 C/S 端分离 ==> 集成 的改造做准备。

1.3 核心类

核心类类图.jpg
  • ClientIdentity
    canal client 和 server 交互之间的身份标识,目前 clientId 写死为 <u>1001</u>(目前 canal server 上的一个 instance 只能有一个 client 消费,clientId 的设计是为 1 个 instance 多 client 消费模式而预留的,暂时不需要理会)。
  • CanalConnector
    SimpleCanalConnector / ClusterCanalConnector:两种 connector 的实现,simple 针对的是简单的 IP 直连模式,cluster 针对多 IP 的模式,可依赖 CanalNodeAccessStrategy 进行 failover 控制。
  • CanalNodeAccessStrategy
    SimpleNodeAccessStrategy / ClusterNodeAccessStrategy:两种 failover 的实现,simple 针对给定的初始 IP 列表进行failover选择,cluster 基于 ZooKeeper 上的 cluster 节点动态选择正在运行的 canal server。
  • ClientRunningMonitor / ClientRunningListener / ClientRunningData
    client running 相关控制,主要为解决 client 自身的 failover 机制。canal client 允许同时启动多个 canal client,通过 running 机制,可保证只有一个 client 在工作,其他 client 做为冷备。当运行中的 client 挂了,running 会控制让冷备中的 client 转为工作模式,这样就可以确保 canal client 也不会是单点。保证整个系统的高可用性。ClientRunningData 对应 ZooKeeper 上的 /otter/canal/destinations/xxx/1001 节点数据。

二、类设计

2.1 CanalConnector

2.1.1 接口介绍

CanalConnector 的作用是与服务端进行交互,支持连接、订阅、获取数据、回滚、断开等操作。一个 CanalConnector 对应一个指定的目标库(destination),如果需要订阅多个目标库需要创建多个 CanalConnector。

以下是 CanalConnector 接口的方法介绍:

CanalConnector.png
  • void connect():与 Canal 服务端建立连接。
  • void disconnect():与 Canal 服务端断开连接。
  • boolean checkValid():检查连接是否合法。
    • 连接服务端失败,一直没有一个可用的连接时,返回 false
    • 当前客户端在进行 running 节点抢占时,作为备份节点存在,并非作为工作节点,返回 false
  • void subcribe():订阅服务端,可以传入一个 filter 字符串用于过滤物理库或表。
  • void unsubscribe():取消订阅服务端。
  • Message getWithoutAck():不需要指定 position 就可以获取服务端数据,Canal 会记住此 client 的最新 position。如果是第一次fetch,则会从 Canal 中保存的最老一条数据开始输出。
    • (int batchSize):从服务端获取 batchSize 大小的数据,有多少取多少,不会阻塞等待。
    • (int batchSize, int timeout, TimeUnit unit):从服务端获取 batchSize 大小的数据,阻塞等待直到拿够 batchSize 条记录或者等待时间达到 timeout。
  • Message get():从服务端获取订阅的消费数据,在执行 getWithoutAck 之后 自动提交确认。
    • (int batchSize):从服务端获取 batchSize 大小的数据,有多少取多少,不会阻塞等待
    • (int batchSize, int timeout, TimeUnit unit):从服务端获取 batchSize 大小的数据,阻塞等待直到拿够 batchSize 条记录或者等待时间达到 timeout。
  • void ack(int batchId):通过传入 batchId 向服务端确认消息已经消费成功,小于等于此 batchId 的 Message 都会被确认。
  • void rollback(int batchId):基于 batchId 回滚对应的 get/getWithoutAck 请求,重新获取一次数据。

2.2.2 接口实现

CanalConnector 接口有两种实现,分别是 SimpleCanalConnector 和 ClusterCanalConnector。前者负责通过指定地址直接连接服务端,后者则可以通过指定一个地址列表或是 ZK 地址来连接高可用的服务端集群。

2.2.2.1 SimpleCanalConnector

该类负责实现与服务端的交互逻辑,包括与服务端进行握手、认证、发送与接收数据包等,以及自身运行状态的控制。下面以 connect 方法为例,举例分析该类与服务端的交互流程。

SimpleCanalConnector#connect 时序图.png

首先,程序会执行 waitClientRunning() 检测自己当前的角色,如果自己是工作节点,那么继续运行,如果是备用节点则线程挂起,等待角色切换,如果是单节点直连则直接设置自己为运行状态并返回。

接下来,程序会执行 doConnect() 方法开始执行真正的连接过程。

  • 第一步,建立 socket 连接:开启一个 SocketChannel,并设置 socket 的连接超时时间,然后与服务端 TCP 三次握手建立连接。
  • 第二步,与服务端进行握手认证:读取并解析服务端发送的报文,其中报文头包含了协议的版本、类型等,报文体包含了握手所需要的信息,如压缩方式、认证需要的 seed 等。调用加密工具类,用 seed 加密传入的 password,随后生成一个 ClientAuth 类,设置 username、password 密文等信息,并将它组装为一个客户端认证报文,发送给服务端。接收并解析服务端的响应报文,判断是否认证成功,若成功则继续设置当前状态为已连接(connected = true),返回本地 socket 地址对象,若失败则抛出异常。

连接建立后,随后的 subscribe/unsubscribe、get/getWithoutAck、ack/rollback 等操作无需再执行握手认证,这些操作实际上大同小异,无非是根据传入参数,向服务端发送一个相应的报文(指定报文类型、构建报文体),然后解析响应报文,返回或执行后续操作。客户端操作、报文头、报文类型的对应关系如下:

方法名 报文类型 报文体格式类 含义
doConnect PacketType.HandShake
PacketType.CLIENTAUTHENTICATION
PacketType.Ack
-
ClientAuth
Ack
握手
客户端认证
服务端确认
subscribe PacketType.SUBSCRIPTION Sub 订阅
unsubscribe PacketType.UNSUBSCRIPTION Unsub 取消订阅
get/getWithoutAck PacketType.GET Get 获取数据
ack PacketType.CLIENTACK ClientAck 客户端确认
rollback PacketType.CLIENTROLLBACK ClientRollback 客户端回滚

2.2.2.2 ClusterCanalConnector

1.4 核心类 中所介绍,ClusterCanalConnector 基于 SimpleCanalConnector、CanalNodeAccessStrategy 以及 impl.running 包内的类来实现客户端连接 HA 的服务端以及客户端自身的 HA。

ClusterCanalConnector 内部维护了一个 currentConnector 对象,这是 SimpleCanalConnector 的一个实例,并且是当前正在工作的实例。类似后者,该类同样实现了 CanalConnector 与用于和服务端交互的方法,其实现方式为内部调用 currentConnecotr 的对象方法来真正地交互,而它自己只负责维护 currentConnector 即可。

该类支持 SimpleNodeAccessStrategy 和 ClusterNodeAccessStrategy 两种节点访问策略,前者需要手动指定服务端地址的列表,而后者基于 ZK 注册与发现,监听 ZK 上的节点,自动更新内存中的地址列表和当前工作节点地址。

server-HA-zknodes.png

以 ClusterNodeAccessStrategy 为例,它维护了 currentAddress 和 runningAddress 两个属性。在构造器初始化时,程序会去订阅监听 ZK 上的 destination/cluster 节点和 destination/running 节点,前者包含服务端地址列表,后者则是正在工作的服务端节点地址,分别对应它的两个属性。当上述两个节点的数据更改或者删除后,程序会对这两个属性做相应的变更,具体如下图所示。

server-HA-connect-workflow.png

三、HA 实现

3.1 实现原理

服务端和客户端 HA 的实现都是利用了 ZooKeeper 可作为分布式锁的特性。

3.1.1 ZooKeeper 分布式锁原理

  • 临时节点(EPHEMERAL):与客户端会话绑定,一旦客户端会话失效(如宕机),这个客户端所创建的所有临时节点都会被移除。

  • Watcher 机制:

ZooKeeper-Watcher.jpg

ZooKeeper 客户端向 ZooKeeper 服务器注册 watcher 的同时,会将 watcher 对象存储在客户端的 WatcherManager;ZooKeeper 服务器触发 watcher 事件后,会向客户端发送通知,客户端线程从 WatcherManager 中回调 watcher 执行相应的功能。

3.1.2 Client HA 具体实现

client-HA-zknodes.png
  • 1001/cursor:最新的 client 确认消费的信息

    zk-1001-cursor.png

  • 1001/filter:client 订阅的过滤规则

    zk-1001-filter.png

  • 1001/running:正在运行中的 client 地址

    zk-1001-running.png

client 启动时会订阅 running 节点的变更事件,然后去尝试创建 running 节点:

  • 创建成功则将自己的信息写入 running 节点,然后继续运行;
  • 创建失败则线程挂起,等待 running 节点释放,然后再次尝试创建 running 节点。
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,122评论 6 505
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,070评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,491评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,636评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,676评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,541评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,292评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,211评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,655评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,846评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,965评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,684评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,295评论 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,894评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,012评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,126评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,914评论 2 355