client的工作过程,需要我们自己去编写对应的逻辑,我们目前只能从example写的例子来看。目前examle中提供了两个例子,一个是单机的,一个是集群的cluster,我们后续如果需要进行开发的话,其实也是开发我们自己的client,以及client的一些逻辑。我们主要看下集群的client是如何实现和消费的,又是怎么和server进行数据交互的。
我们来看看具体的代码:
protected void process() {
int batchSize = 5 * 1024;
while (running) {
try {
MDC.put("destination", destination);
connector.connect();
connector.subscribe();
waiting = false;
while (running) {
Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
// try {
// Thread.sleep(1000);
// } catch (InterruptedException e) {
// }
} else {
printSummary(message, batchId, size);
printEntry(message.getEntries());
}
connector.ack(batchId); // 提交确认
// connector.rollback(batchId); // 处理失败, 回滚数据
}
} catch (Exception e) {
logger.error("process error!", e);
} finally {
connector.disconnect();
MDC.remove("destination");
}
}
}
这个的这样的过程是这样的
- 连接,connector.connect()
- 订阅,connector.subscribe
- 获取数据,connector.getWithoutAck()
- 业务处理
- 提交确认,connector.ack()
- 回滚,connector.rollback()
- 断开连接,connector.disconnect()
我们具体来看下。
一、建立连接
CanalConnector主要有两个实现,一个是SimpleCanalConnector,一个是ClusterCanalConnector,我们主要看下ClusterCanalConnector,这也是我们要用的一个模式。
我们用的时候,通过一个工厂类生成我们需要的Connector,这里的工厂类是CanalConnectors,里面包含了生成ClusterCanalConnector的方法。
public static CanalConnector newClusterConnector(String zkServers, String destination, String username,
String password) {
ClusterCanalConnector canalConnector = new ClusterCanalConnector(username,
password,
destination,
new ClusterNodeAccessStrategy(destination, ZkClientx.getZkClient(zkServers)));
canalConnector.setSoTimeout(30 * 1000);
return canalConnector;
}
用到的参数有zk的地址,canal的名称,数据库的账号密码。里面有个ClusterNodeAccessStrategy是用来选择client的策略,这个ClusterNodeAccessStrategy的构造方法里面有些东西需要我们关注下。
1.1 ClusterNodeAccessStrategy
public ClusterNodeAccessStrategy(String destination, ZkClientx zkClient){
this.zkClient = zkClient;
childListener = new IZkChildListener() {
public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
initClusters(currentChilds);
}
};
dataListener = new IZkDataListener() {
public void handleDataDeleted(String dataPath) throws Exception {
runningAddress = null;
}
public void handleDataChange(String dataPath, Object data) throws Exception {
initRunning(data);
}
};
String clusterPath = ZookeeperPathUtils.getDestinationClusterRoot(destination);
this.zkClient.subscribeChildChanges(clusterPath, childListener);
initClusters(this.zkClient.getChildren(clusterPath));
String runningPath = ZookeeperPathUtils.getDestinationServerRunning(destination);
this.zkClient.subscribeDataChanges(runningPath, dataListener);
initRunning(this.zkClient.readData(runningPath, true));
}
这边起了两个监听器,都是监听server端的活动服务器的。一个是获取所有的server列表,一个是获取活动的server服务器,都是从zk的对应节点上去取的。
1.2 连接connect
获取到CanalConnector之后,就是真正的连接了。在ClusterCanalConnector中,我们可以看到,其实他底层用的也是SimpleCanalConnector,只不过加了一个选择的策略。
public void connect() throws CanalClientException {
if (connected) {
return;
}
if (runningMonitor != null) {
if (!runningMonitor.isStart()) {
runningMonitor.start();
}
} else {
waitClientRunning();
if (!running) {
return;
}
doConnect();
if (filter != null) { // 如果存在条件,说明是自动切换,基于上一次的条件订阅一次
subscribe(filter);
}
if (rollbackOnConnect) {
rollback();
}
}
connected = true;
}
如果是集群模式的客户端,那么这边的runningMonitor不为空,因为他进行了初始化。我们主要看下runningMonitor.start()里面的操作。
public void start() {
super.start();
String path = ZookeeperPathUtils.getDestinationClientRunning(this.destination, clientData.getClientId());
zkClient.subscribeDataChanges(path, dataListener);
initRunning();
}
这边监听的路径是:/otter/canal/destinations/{destination}/{clientId}/running。如果有任何的变化,或节点的删除,那么执行dataListener里面的操作。
dataListener = new IZkDataListener() {
public void handleDataChange(String dataPath, Object data) throws Exception {
MDC.put("destination", destination);
ClientRunningData runningData = JsonUtils.unmarshalFromByte((byte[]) data, ClientRunningData.class);
if (!isMine(runningData.getAddress())) {
mutex.set(false);
}
if (!runningData.isActive() && isMine(runningData.getAddress())) { // 说明出现了主动释放的操作,并且本机之前是active
release = true;
releaseRunning();// 彻底释放mainstem
}
activeData = (ClientRunningData) runningData;
}
public void handleDataDeleted(String dataPath) throws Exception {
MDC.put("destination", destination);
mutex.set(false);
// 触发一下退出,可能是人为干预的释放操作或者网络闪断引起的session expired timeout
processActiveExit();
if (!release && activeData != null && isMine(activeData.getAddress())) {
// 如果上一次active的状态就是本机,则即时触发一下active抢占
initRunning();
} else {
// 否则就是等待delayTime,避免因网络瞬端或者zk异常,导致出现频繁的切换操作
delayExector.schedule(new Runnable() {
public void run() {
initRunning();
}
}, delayTime, TimeUnit.SECONDS);
}
}
};
这里的注释比较清楚,基本上如果数据发生了变化,那么进行节点释放后,将运行节点置为活动节点。如果发生了数据删除,那么直接触发退出,如果上一次的active状态是本机,那么触发一下active抢占,否则等待delayTime,默认5s后重试。下面我们主要看下initRunning。
1.3 initRunning
这块主要是创建运行节点的临时节点。节点路径是/otter/canal/destinations/{destination}/{clientId},节点内容是ClientRunningData的json序列化结果。连接的代码:
public InetSocketAddress processActiveEnter() {
InetSocketAddress address = doConnect();
mutex.set(true);
if (filter != null) { // 如果存在条件,说明是自动切换,基于上一次的条件订阅一次
subscribe(filter);
}
if (rollbackOnConnect) {
rollback();
}
return address;
}
这块有几段逻辑,我们慢慢看下。
1.3.1 doConnect
这里是client直接连上了server,通过socket连接,也就是server暴露的socket端口。
private InetSocketAddress doConnect() throws CanalClientException {
try {
channel = SocketChannel.open();
channel.socket().setSoTimeout(soTimeout);
SocketAddress address = getAddress();
if (address == null) {
address = getNextAddress();
}
channel.connect(address);
readableChannel = Channels.newChannel(channel.socket().getInputStream());
writableChannel = Channels.newChannel(channel.socket().getOutputStream());
Packet p = Packet.parseFrom(readNextPacket());
if (p.getVersion() != 1) {
throw new CanalClientException("unsupported version at this client.");
}
if (p.getType() != PacketType.HANDSHAKE) {
throw new CanalClientException("expect handshake but found other type.");
}
//
Handshake handshake = Handshake.parseFrom(p.getBody());
supportedCompressions.addAll(handshake.getSupportedCompressionsList());
//
ClientAuth ca = ClientAuth.newBuilder()
.setUsername(username != null ? username : "")
.setPassword(ByteString.copyFromUtf8(password != null ? password : ""))
.setNetReadTimeout(soTimeout)
.setNetWriteTimeout(soTimeout)
.build();
writeWithHeader(Packet.newBuilder()
.setType(PacketType.CLIENTAUTHENTICATION)
.setBody(ca.toByteString())
.build()
.toByteArray());
//
Packet ack = Packet.parseFrom(readNextPacket());
if (ack.getType() != PacketType.ACK) {
throw new CanalClientException("unexpected packet type when ack is expected");
}
Ack ackBody = Ack.parseFrom(ack.getBody());
if (ackBody.getErrorCode() > 0) {
throw new CanalClientException("something goes wrong when doing authentication: "
+ ackBody.getErrorMessage());
}
connected = true;
return new InetSocketAddress(channel.socket().getLocalAddress(), channel.socket().getLocalPort());
} catch (IOException e) {
throw new CanalClientException(e);
}
}
这边采用NIO编程,建立和server的socket连接后,发送了握手包和认证包,当收到ack包后,认为连接成功。认证包的服务端处理在ClientAuthenticationHandler类中,握手处理在HandshakeInitializationHandler类。
server接收到认证的消息后,会做如下的处理:
public void messageReceived(final ChannelHandlerContext ctx, MessageEvent e) throws Exception {
ChannelBuffer buffer = (ChannelBuffer) e.getMessage();
final Packet packet = Packet.parseFrom(buffer.readBytes(buffer.readableBytes()).array());
switch (packet.getVersion()) {
case SUPPORTED_VERSION:
default:
final ClientAuth clientAuth = ClientAuth.parseFrom(packet.getBody());
// 如果存在订阅信息
if (StringUtils.isNotEmpty(clientAuth.getDestination())
&& StringUtils.isNotEmpty(clientAuth.getClientId())) {
ClientIdentity clientIdentity = new ClientIdentity(clientAuth.getDestination(),
Short.valueOf(clientAuth.getClientId()),
clientAuth.getFilter());
try {
MDC.put("destination", clientIdentity.getDestination());
embeddedServer.subscribe(clientIdentity);
ctx.setAttachment(clientIdentity);// 设置状态数据
// 尝试启动,如果已经启动,忽略
if (!embeddedServer.isStart(clientIdentity.getDestination())) {
ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(clientIdentity.getDestination());
if (!runningMonitor.isStart()) {
runningMonitor.start();
}
}
} finally {
MDC.remove("destination");
}
}
NettyUtils.ack(ctx.getChannel(), new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) throws Exception {
//忽略
}
});
break;
}
}
主要的逻辑在subscribe里面。如果metaManager没有启动,那么需要进行启动。启动时,会从zk节点下面拉取一些数据,包括客户端的消费位点情况等等。然后就是订阅,订阅是新建一个zk节点,路径为/otter/canal/destinations/{destination}/{clientId}。然后还有一些过滤器,也需要写到zk中。之后就是获取一下本client的位点信息,如果原来zk中包含,那么直接从内存中获取,否则取eventStore的第一条数据。
1.3.2 subscribe
发送订阅消息给server,通过socket的方式。这边是判断,如果filter不为空,才发送订阅消息。服务端的处理过程是这样的:
case SUBSCRIPTION:
Sub sub = Sub.parseFrom(packet.getBody());
if (StringUtils.isNotEmpty(sub.getDestination()) && StringUtils.isNotEmpty(sub.getClientId())) {
clientIdentity = new ClientIdentity(sub.getDestination(),
Short.valueOf(sub.getClientId()),
sub.getFilter());
MDC.put("destination", clientIdentity.getDestination());
// 尝试启动,如果已经启动,忽略
if (!embeddedServer.isStart(clientIdentity.getDestination())) {
ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(clientIdentity.getDestination());
if (!runningMonitor.isStart()) {
runningMonitor.start();
}
}
embeddedServer.subscribe(clientIdentity);
ctx.setAttachment(clientIdentity);// 设置状态数据
NettyUtils.ack(ctx.getChannel(), null);
} else {
NettyUtils.error(401,
MessageFormatter.format("destination or clientId is null", sub.toString()).getMessage(),
ctx.getChannel(),
null);
}
break;
类似于connect的过程,不过这边带上了filter的参数。这边启动了server以及他的监听器。
1.3.3 rollback
这里的回滚是指回滚server端记录的本client的位点信息。
public void rollback() throws CanalClientException {
waitClientRunning();
rollback(0);// 0代笔未设置
}
这里发送了rollback的指令。服务端是这么处理的:
case CLIENTROLLBACK:
ClientRollback rollback = CanalPacket.ClientRollback.parseFrom(packet.getBody());
MDC.put("destination", rollback.getDestination());
if (StringUtils.isNotEmpty(rollback.getDestination())
&& StringUtils.isNotEmpty(rollback.getClientId())) {
clientIdentity = new ClientIdentity(rollback.getDestination(),
Short.valueOf(rollback.getClientId()));
if (rollback.getBatchId() == 0L) {
embeddedServer.rollback(clientIdentity);// 回滚所有批次
} else {
embeddedServer.rollback(clientIdentity, rollback.getBatchId()); // 只回滚单个批次
}
} else {
NettyUtils.error(401,
MessageFormatter.format("destination or clientId is null", rollback.toString())
.getMessage(),
ctx.getChannel(),
null);
}
break;
这里的batchId传入的是0,也就是要回滚所有的批次。我们来看下这个回滚的动作:
@Override
public void rollback(ClientIdentity clientIdentity) throws CanalServerException {
checkStart(clientIdentity.getDestination());
CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination());
// 因为存在第一次链接时自动rollback的情况,所以需要忽略未订阅
boolean hasSubscribe = canalInstance.getMetaManager().hasSubscribe(clientIdentity);
if (!hasSubscribe) {
return;
}
synchronized (canalInstance) {
// 清除batch信息
canalInstance.getMetaManager().clearAllBatchs(clientIdentity);
// rollback eventStore中的状态信息
canalInstance.getEventStore().rollback();
logger.info("rollback successfully, clientId:{}", new Object[] { clientIdentity.getClientId() });
}
}
这里回滚的,其实是eventStore中的指针,把get的指针设置为之前ack的指针。
二、订阅数据
当client连接server完成后,就需要进行binlog数据的订阅。
public void subscribe() throws CanalClientException {
subscribe(""); // 传递空字符即可
}
public void subscribe(String filter) throws CanalClientException {
int times = 0;
while (times < retryTimes) {
try {
currentConnector.subscribe(filter);
this.filter = filter;
return;
} catch (Throwable t) {
if (retryTimes == -1 && t.getCause() instanceof InterruptedException) {
logger.info("block waiting interrupted by other thread.");
return;
} else {
logger.warn(String.format(
"something goes wrong when subscribing from server: %s",
currentConnector != null ? currentConnector.getAddress() : "null"),
t);
times++;
restart();
logger.info("restart the connector for next round retry.");
}
}
}
throw new CanalClientException("failed to subscribe after " + times + " times retry.");
}
订阅这块的内容不再赘述,在上面的connect过程中有提到。这边还有一个失败重试的机制,当异常不是中断异常的情况下,会重试重启client connector,直到达到了阈值retryTimes。
三、获取数据
在建立连接和进行数据订阅之后,就可以开始进行binlog数据的获取了。主要的方法是getWithOutAck这个方法,这种是需要client自己进行数据ack的,保证了只有数据真正的被消费,而且进行了业务逻辑处理之后,才会ack。当然,如果有了异常,也会进行一定次数的重试和重启。
public Message getWithoutAck(int batchSize, Long timeout, TimeUnit unit) throws CanalClientException {
waitClientRunning();
try {
...//忽略
writeWithHeader(Packet.newBuilder()
.setType(PacketType.GET)
.setBody(Get.newBuilder()
.setAutoAck(false)
.setDestination(clientIdentity.getDestination())
.setClientId(String.valueOf(clientIdentity.getClientId()))
.setFetchSize(size)
.setTimeout(time)
.setUnit(unit.ordinal())
.build()
.toByteString())
.build()
.toByteArray());
return receiveMessages();
} catch (IOException e) {
throw new CanalClientException(e);
}
}
我们可以看到,其实是发送了一个GET命令给server端,然后传递了一个参数batchSize,还有超时时间,而且不是自动提交的。服务端的处理是这样的:
embeddedServer.getWithoutAck(clientIdentity, get.getFetchSize());
也是调用的这个方法:
@Override
public Message getWithoutAck(ClientIdentity clientIdentity, int batchSize, Long timeout, TimeUnit unit)
throws CanalServerException {
checkStart(clientIdentity.getDestination());
checkSubscribe(clientIdentity);
CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination());
synchronized (canalInstance) {
// 获取到流式数据中的最后一批获取的位置
PositionRange<LogPosition> positionRanges = canalInstance.getMetaManager().getLastestBatch(clientIdentity);
Events<Event> events = null;
if (positionRanges != null) { // 存在流数据
events = getEvents(canalInstance.getEventStore(), positionRanges.getStart(), batchSize, timeout, unit);
} else {// ack后第一次获取
Position start = canalInstance.getMetaManager().getCursor(clientIdentity);
if (start == null) { // 第一次,还没有过ack记录,则获取当前store中的第一条
start = canalInstance.getEventStore().getFirstPosition();
}
events = getEvents(canalInstance.getEventStore(), start, batchSize, timeout, unit);
}
if (CollectionUtils.isEmpty(events.getEvents())) {
logger.debug("getWithoutAck successfully, clientId:{} batchSize:{} but result is null",
clientIdentity.getClientId(), batchSize);
return new Message(-1, new ArrayList<Entry>()); // 返回空包,避免生成batchId,浪费性能
} else {
// 记录到流式信息
Long batchId = canalInstance.getMetaManager().addBatch(clientIdentity, events.getPositionRange());
List<Entry> entrys = Lists.transform(events.getEvents(), new Function<Event, Entry>() {
public Entry apply(Event input) {
return input.getEntry();
}
});
if (logger.isInfoEnabled()) {
logger.info("getWithoutAck successfully, clientId:{} batchSize:{} real size is {} and result is [batchId:{} , position:{}]",
clientIdentity.getClientId(),
batchSize,
entrys.size(),
batchId,
events.getPositionRange());
}
return new Message(batchId, entrys);
}
}
}
最主要的逻辑在这里:
- 判断canalInstance是否已经启动:checkStart
- 判断订阅列表中是否包含当前的client:checkSubscribe
- 根据client信息从metaManager中获取最后消费的批次:getLastestBatch,这块在运行起来后,是从内存中取的,但是在instance启动时,是从zk中拉取的,是从/otter/canal/destinations/{destination}/{clientId}/mark下面获取的,后续也会定时(1s)刷新到这里面
- 如果能获取到消费的批次,直接从eventStore的队列中获取数据。
- 如果positionRanges为空,那么从metaManager中获取指针。如果指针也没有,说明原来没有ack过数据,需要从store中第一条开始获取。这个过程其实就是找start,也就是上一次ack的位置。
- 调用getEvent,获取数据。根据传入的参数不同,调用不同的方法去获取数据,但是最终都是调用的goGet方法。这个doGet方法不是很复杂,主要是根据参数从store队列中获取数据,然后把指针进行新的设置。
- 如果没有取到binlog数据,那么直接返回,批次号为-1。
- 如果取到了数据,记录一下流式数据后返回。
结果封装在Messages中,最终改为Message,包含批次号和binlog列表。
四、业务处理
拿到message后,需要进行判断batchId,如果batchId=-1或者binlog大小为0,说明没有拿到数据。否则在message基础上进行逻辑处理。
Message的内容,后续我们再进行讨论。
五、提交确认
connector.ack(batchId); // 提交确认
提交批次id,底层发送CLIENTACK命令到server。server调用CanalServerWithEmbedded的ack方法来进行提交。
public void ack(ClientIdentity clientIdentity, long batchId) throws CanalServerException {
checkStart(clientIdentity.getDestination());
checkSubscribe(clientIdentity);
CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination());
PositionRange<LogPosition> positionRanges = null;
positionRanges = canalInstance.getMetaManager().removeBatch(clientIdentity, batchId); // 更新位置
if (positionRanges == null) { // 说明是重复的ack/rollback
throw new CanalServerException(String.format("ack error , clientId:%s batchId:%d is not exist , please check",
clientIdentity.getClientId(),
batchId));
}
// 更新cursor
if (positionRanges.getAck() != null) {
canalInstance.getMetaManager().updateCursor(clientIdentity, positionRanges.getAck());
if (logger.isInfoEnabled()) {
logger.info("ack successfully, clientId:{} batchId:{} position:{}",
clientIdentity.getClientId(),
batchId,
positionRanges);
}
}
// 可定时清理数据
canalInstance.getEventStore().ack(positionRanges.getEnd());
}
首先更新metaManager中的batch,然后更新ack指针,同时清理store中到ack指针位置的数据。
六、回滚
如果有失败的情况,需要进行回滚。发送CLIENTROLLBACK命令给server端,进行数据回滚。回滚单个批次时的处理逻辑是这样的:
@Override
public void rollback(ClientIdentity clientIdentity, Long batchId) throws CanalServerException {
checkStart(clientIdentity.getDestination());
CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination());
// 因为存在第一次链接时自动rollback的情况,所以需要忽略未订阅
boolean hasSubscribe = canalInstance.getMetaManager().hasSubscribe(clientIdentity);
if (!hasSubscribe) {
return;
}
synchronized (canalInstance) {
// 清除batch信息
PositionRange<LogPosition> positionRanges = canalInstance.getMetaManager().removeBatch(clientIdentity,
batchId);
if (positionRanges == null) { // 说明是重复的ack/rollback
throw new CanalServerException(String.format("rollback error, clientId:%s batchId:%d is not exist , please check",
clientIdentity.getClientId(),
batchId));
}
// lastRollbackPostions.put(clientIdentity,
// positionRanges.getEnd());// 记录一下最后rollback的位置
// TODO 后续rollback到指定的batchId位置
canalInstance.getEventStore().rollback();// rollback
// eventStore中的状态信息
logger.info("rollback successfully, clientId:{} batchId:{} position:{}",
clientIdentity.getClientId(),
batchId,
positionRanges);
}
}
这里的rollback到指定的batchId,其实是假的。他的rollback也是全量回滚到ack的指针位置。
七、断开连接
在发生异常情况时,client会断开与server的连接,也就是disconnect方法。
public void disconnect() throws CanalClientException {
if (rollbackOnDisConnect && channel.isConnected()) {
rollback();
}
connected = false;
if (runningMonitor != null) {
if (runningMonitor.isStart()) {
runningMonitor.stop();
}
} else {
doDisconnnect();
}
}
判断是否在断开连接的时候回滚参数(默认false)和当前socket通道是否连接中,进行回滚。
否则调用runningMonitor.stop方法进行停止。主要的过程是这样的:
- 取消监听/otter/canal/destinations/{destination}/{clientId}/running/节点变化信息
- 删除上面这个节点
- 关闭socket的读通道
- 关闭socket的写通道
- 关闭socket channel