1 消费异常关键代码分析
1.1 获取消息
Message message = canalConnector.getWithoutAck(BatchSize());
long batchId = message.getId();
getWithoutAck(batchSize)是基于所谓流式调用的,应该为了支持多线程调用提高消费的效率,但是这在不觉间就有可能埋下调用的疑问,尤其是在消费message时出异常时,客户端需要重试消费;
1.2 处理message出现异常,重试处理
重试处理的只能是对出现异常的message进行再次处理,不能再次调用getWithoutAck(batchSize)
1.3 重试处理message失败后的处理
可以rollback()或者默认处理即告诉canal服务端这一批数据处理成功,可以删除
2 与canal服务端建立连接
2.1 原因分析:
canal client与canal服务端建立连接成功后才能接收服务端发来的message进行消费,但是客户端并不能保证一定能与服务端建立连接或者说一直保持连接,因为难免出现网络问题或者服务宕机的情况,为此客户端为了更为健壮,需要重点处理建立连接的逻辑
2.2 建立连接关键代码
解决的思路很简单,调用CanalConnector.connect()不断进行连接即可
2.3 关于建立连接的坑
2.3.1 失去连接的异常捕获
canal client阿里官方封装的异常CanalClientException,会封装IOException与ConnectException
2.3.2 再次连接的disconnect
项目代码建立连接的逻辑是不重新new CanalConnector,而是利用已经存在CanalConnector再次去建立连接,这时候就需要先调用disconnect()去改变连接状态,然后才能与canal服务端重新建立连接,不然canal服务端会因为connector的连接状态为true(private volatile boolean connected;)拒绝再次连接;