一口气搞定系列-Canal组件

1,Canal 组件部署

  1. 安装配置MySQL
    1.1 安装 mysql,
    1.2 配置 mysql binlog使用ROW模式
    1.3 在MySQL添加对应的canal用户
    1.4 检查canal用户生效
  2. 下载canal并配置
    2.1 下载canal(1.1.4)
    2.2 配置 canal(1.1.4)
    2.3 启动canal (需要JDK>=1.6.25)

(1) 配置MySQL

(1.1) 安装MySQL

(1.2) 修改MySQL配置文件

canal的原理是基于mysql binlog技术,所以需要开启mysql的binlog写入功能,并且配置binlog模式为row.

[mysqld]  
log-bin=mysql-bin  # 开启 binlog
binlog-format=ROW  # 选择 ROW 模式
server_id=1        # 配置 MySQL replaction ,不能和 canal 的 slaveId 重复

(1.3) MySQL添加canal用户并授权

canal的原理是模拟自己为mysql slave,所以需要mysql slave的相关权限

CREATE USER canal IDENTIFIED BY 'canal';    

GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; 

FLUSH PRIVILEGES;

(1.4) 校验用户对应权限

  1. show master status ;
    如果正常显示binlog,则没问题,如果提示 Access denied; you need (at least one of) the SUPER, REPLICATION CLIENT privilege(s) for this operation ,则没有对应 REPLICATION CLIENT 权限

  2. show slave status ;
    如果提示 Access denied; you need (at least one of) the REPLICATION SLAVE privilege(s) for this operation ,则没有对应 REPLICATION SLAVE 权限

(2) 下载并启动canal

执行 ./bin/startup.sh 即可启动

(2.1) 下载canal

https://github.com/alibaba/canal/releases 选择合适的版本
下载 wget https://github.com/alibaba/canal/releases/download/canal-1.1.14/canal.deployer-1.1.14.tar.gz

(2.2) 修改配置

修改 conf/example/instance.properties
以下只列出比较重要的配置
详细的参数的说明可以参考这个网址

## mysql serverId  不能重复
canal.instance.mysql.slaveId = 1234
#position info,需要改成自己的数据库信息
canal.instance.master.address = 127.0.0.1:3306 
#username/password,需要改成自己的数据库信息
canal.instance.dbUsername = canal  
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName = database_wkq
canal.instance.connectionCharset = UTF-8
#table regex  需要监控的表 通过,分隔  也可以使用正则 .*\\..*
canal.instance.filter.regex = table_wkq,table_2,table_3
# table black regex
canal.instance.filter.black.regex =

(2.3) 启动canal

通过 sh bin/startup.sh 或者 ./bin/startup.sh 启动

启动后通过 jps -l 命令 可以看到 com.alibaba.otter.canal.deployer.CanalLauncher

canal启动时canal.log

canal.deployer-1.0.24/logs/canal/canal.log

2018-07-23 20:27:46.449 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.
2018-07-23 20:27:46.625 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[10.0.62.130:11111]
2018-07-23 20:27:47.576 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......
2018-07-23 20:27:47.721 [canal-instance-scan-0] INFO  c.a.o.canal.deployer.monitor.SpringInstanceConfigMonitor - auto notify start xxx1 successful.
2018-07-23 20:27:47.802 [canal-instance-scan-0] INFO  c.a.o.canal.deployer.monitor.SpringInstanceConfigMonitor - auto notify start xxx2 successful.
2018-07-23 20:27:47.862 [canal-instance-scan-0] INFO  c.a.o.canal.deployer.monitor.SpringInstanceConfigMonitor - auto notify start xxx3 successful.
2018-07-23 20:27:47.921 [canal-instance-scan-0] INFO  c.a.o.canal.deployer.monitor.SpringInstanceConfigMonitor - auto notify start xxx4 successful.
2018-07-23 20:27:47.987 [canal-instance-scan-0] INFO  c.a.o.canal.deployer.monitor.SpringInstanceConfigMonitor - auto notify start xxx5 successful.
2018-07-23 20:27:48.044 [canal-instance-scan-0] INFO  c.a.o.canal.deployer.monitor.SpringInstanceConfigMonitor - auto notify start xxx6 successful.
2018-07-23 20:27:48.094 [canal-instance-scan-0] INFO  c.a.o.canal.deployer.monitor.SpringInstanceConfigMonitor - auto notify start xxx7 successful.

(2.2) canal正常启动时instance对应的日志

canal.deployer-1.0.24/logs/example/example.log
2018-07-23 20:27:47.429 [canal-instance-scan-0] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
2018-07-23 20:27:47.436 [canal-instance-scan-0] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [xxx/instance.properties]
2018-07-23 20:27:47.444 [canal-instance-scan-0] WARN  org.springframework.beans.TypeConverterDelegate - PropertyEditor [com.sun.beans.editors.EnumEditor] found through deprecated global PropertyEditorManager fallback - consider using a more isolated form of registration, e.g. on the BeanWrapper/BeanFactory!
2018-07-23 20:27:47.451 [canal-instance-scan-0] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-xxx 
2018-07-23 20:27:47.453 [canal-instance-scan-0] INFO  c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....
2018-07-23 20:27:47.666 [destination = xxx , address = /127.0.0.1:3306 , EventParser] WARN  c.a.otter.canal.parse.inbound.mysql.MysqlEventParser - prepare to find start position just show master status

(3)停止canal

sh stop.sh` 或 `./bin/stop.sh
2018-07-23 21:45:08.241 [Thread-5] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## stop the canal server
2018-07-23 21:45:08.296 [Thread-5] INFO  com.alibaba.otter.canal.deployer.CanalController - ## stop the canal server[10.0.62.130:11111]
2018-07-23 21:45:08.296 [Thread-5] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## canal server is down.

(4) 程序中使用

以下代码仅作为示例

<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.4</version>
</dependency>
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import lombok.extern.slf4j.Slf4j;

import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * CanalTest
 *
 * @author: weikeqin.cn@gmail.com
 * @date: 2020-05-30 08:26
 **/
@Slf4j
public class CanalTest {

    /**
     * @param args
     */
    public static void main(String args[]) {

        String canalHost = "127.0.0.1";
        int canalPort = 11111;
        String destination = "example";
        InetSocketAddress address = new InetSocketAddress(canalHost, canalPort);

        // 创建链接
        CanalConnector connector = CanalConnectors.newSingleConnector(address, destination, "", "");
        // connector = CanalConnectors.newClusterConnector(addresses, destination, "", "");

        int batchSize = 1000;
        int emptyCount = 0;
        try {
            // 链接对应的canal server
            connector.connect();
            // 客户端订阅,不提交客户端filter,以服务端的filter为准
            connector.subscribe();
            // 回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有 ack 的地方开始拿
            connector.rollback();

            int totalEmptyCount = 12000000;

            // 退出条件 一般是 while true
            while (emptyCount < totalEmptyCount) {
                // 获取指定数量的数据
                Message message = connector.getWithoutAck(batchSize);
                long batchId = message.getId();
                int size = message.getEntries().size();

                if (batchId == -1 || size == 0) {

                    emptyCount++;
                    log.info("empty count : " + emptyCount);

                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        log.info("", e);
                    }

                } else {
                    emptyCount = 0;
                    log.info("message[batchId={},size={}] ", batchId, size);

                    // 消费
                    consumeMsg(message.getEntries());
                }

                // 提交确认
                connector.ack(batchId);
                // 处理失败, 回滚数据
                // connector.rollback(batchId);
            }

            log.info("empty too many times, exit");
        } finally {
            // 释放链接
            connector.disconnect();
        }
    }

    /**
     * 消费消息
     *
     * @param entries
     */
    private static void consumeMsg(List<CanalEntry.Entry> entries) {

        // 这里只打印
        printEntry(entries);
        // TODO 其它操作

    }

    /**
     * @param entrys
     */
    private static void printEntry(List<CanalEntry.Entry> entrys) {

        for (CanalEntry.Entry entry : entrys) {

            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                continue;
            }

            CanalEntry.RowChange rowChage = null;
            try {
                rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
            }

            CanalEntry.EventType eventType = rowChage.getEventType();
            log.info(String.format("================  binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(),
                    entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(),
                    entry.getHeader().getTableName(),
                    eventType)
            );

            for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {

                if (eventType == CanalEntry.EventType.DELETE) {
                    printColumn(rowData.getBeforeColumnsList());
                } else if (eventType == CanalEntry.EventType.INSERT) {
                    printColumn(rowData.getAfterColumnsList());
                } else {
                    log.info("------- before");
                    printColumn(rowData.getBeforeColumnsList());
                    log.info("------- after");
                    printColumn(rowData.getAfterColumnsList());
                }
            }
        }
    }

    /**
     * @param columns
     */
    private static void printColumn(List<CanalEntry.Column> columns) {
        Map<String, String> map = new HashMap<>();
        for (CanalEntry.Column column : columns) {
            map.put(column.getName(), column.getValue());
            //log.info(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
        }
        log.info("{}", map);
    }


}

(5) canal复制原理

复制如何工作,整体上来说,复制有3个步骤:
(1) master将改变记录到二进制日志(binary log)中(这些记录叫做二进制日志事件,binary log events);
(2) slave将master的binary log events复制到它的中继日志(relay log)中;
(3) slave读取中继日志中的事件,将其重放到备库数据之上。

下图描述了复制的过程:
[图片上传失败...(image-b777c3-1662962030216)]

(6) 遇到的问题

(6.1) Error When doing Client Authentication:ErrorPacket

Caused by: java.io.IOException: connect /127.0.0.1:3306 failure:java.io.IOException: Error When doing Client Authentication:ErrorPacket [errorNumber=1045, fieldCount=-1, message=Access denied for user 'canal'@'localhost' (using password: YES), sqlState=28000, sqlStateMarker=#]
    at com.alibaba.otter.canal.parse.driver.mysql.MysqlConnector.negotiate(MysqlConnector.java:208)
    at com.alibaba.otter.canal.parse.driver.mysql.MysqlConnector.connect(MysqlConnector.java:71)
    at com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection.connect(MysqlConnection.java:56)
    at com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser.preDump(MysqlEventParser.java:86)
    at com.alibaba.otter.canal.parse.inbound.AbstractEventParser$3.run(AbstractEventParser.java:157)
    at java.lang.Thread.run(Thread.java:748)

原因 用户名密码不正确

(6.2) Access denied; you need (at least one of) the SUPER, REPLICATION CLIENT privilege(s) for this operation

ERROR com.alibaba.otter.canal.common.alarm.LogAlarmHandler - destination:xx
[com.alibaba.otter.canal.parse.exception.CanalParseException: command : 'show master status' has an error!
Caused by: java.io.IOException: ErrorPacket [errorNumber=1227, fieldCount=-1, message=Access denied; you need (at least one of) the SUPER, REPLICATION CLIENT privilege(s) for this operation, sqlState=42000, sqlStateMarker=#]
 with command: show master status

用canal账户登录后发现可以查看对应数据库对应表的数据,但是 show master status 提示 Access denied; you need (at least one of) the SUPER, REPLICATION CLIENT privilege(s) for this operation

1、instance.properties配置文件里配置的用户没有REPLICATION权限
2、canal instance.properties 配置错误
3、配置文件里用户名密码不正确
4、MySQL对应用户不存在
5、MySQL配置不对

给canal用户对应的replication权限
grant replication client on *.* to 'canal'@'%';
flush privileges

(6.3) Access denied; you need (at least one of) the REPLICATION SLAVE privilege(s) for this operation

[destination = xxx , address = /127.0.0.1:3306 , EventParser] ERROR com.alibaba.otter.canal.common.alarm.LogAlarmHandler - destination:xx[java.io.IOException: Received error packet: errno = 1227, sqlstate = 42000 errmsg = Access denied; you need (at least one of) the REPLICATION SLAVE privilege(s) for this operation
    at com.alibaba.otter.canal.parse.inbound.mysql.dbsync.DirectLogFetcher.fetch(DirectLogFetcher.java:95)

Access denied 没权限 需要给对应账户授权
REPLICATION SLAVE 常用于建立复制时所需要用到的用户权限,也就是slave server必须被master server授权具有该权限的用户,才能通过该用户复制。
并且”SHOW SLAVE HOSTS”这条命令和REPLICATION SLAVE权限有关,否则执行时会报错:

REPLICATION CLIENT 不可用于建立复制,有该权限时,只是多了可以使用如”SHOW SLAVE STATUS”、”SHOW MASTER STATUS”等命令。
在5.6.6版本以后,也可以使用”SHOW BINARY LOGS”。

GRANT REPLICATION SLAVE ON *.* TO 'canal'@'%'`
`flush privileges

(6.4) canal用了UseConcMarkSweepGC不能用JDK14

Java HotSpot(TM) 64-Bit Server VM warning: Ignoring option PermSize; support was removed in 8.0
Java HotSpot(TM) 64-Bit Server VM warning: Ignoring option MaxPermSize; support was removed in 8.0
Java HotSpot(TM) 64-Bit Server VM warning: Ignoring option UseConcMarkSweepGC; support was removed in 14.0
Java HotSpot(TM) 64-Bit Server VM warning: Ignoring option CMSParallelRemarkEnabled; support was removed in 14.0
Unrecognized VM option 'UseCMSCompactAtFullCollection'
Error: Could not create the Java Virtual Machine.
Error: A fatal exception has occurred. Program will exit.

修改 bin/start.sh 文件,修改对应的JAVA路径

## set java path
if [ -z "$JAVA" ] ; then
  #JAVA=$(which java)
  JAVA="/Library/Java/JavaVirtualMachines/jdk1.8.0_211.jdk/Contents/Home/bin/java"
fi

(6.5) com.alibaba.otter.canal.protocol.exception.CanalClientException: java.net.ConnectException: Connection refused

Exception in thread "main" com.alibaba.otter.canal.protocol.exception.CanalClientException: java.net.ConnectException: Connection refused
    at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.doConnect(SimpleCanalConnector.java:198)
    at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.connect(SimpleCanalConnector.java:115)

canal没启动 或者 canal挂了
配置被删了,检查对应 destinationinstance.properties
instance.properties 没配置

(7) canal-admin后台管理

canal-admin设计上是为canal提供整体配置管理、节点运维等面向运维的功能,提供相对友好的WebUI操作界面,方便更多用户快速和安全的操作
简单来说,canal-admin是一个后台维护系统,简化了配置canal的工作,提高了效率,终于不用到服务器上一个一个配了

访问地址 http://127.0.0.1:8089/

(7.1) canal-admin的核心模型主要有

instance,对应canal-server里的instance,一个最小的订阅mysql的队列
server,对应canal-server,一个server里可以包含多个instance
集群,对应一组canal-server,组合在一起面向高可用HA的运维

References

[1] canal/wiki
[2] canal-AdminGuide
[3] ClientExample
[4] Canal-Admin-QuickStart
[5] Canal-Admin-Guide
[6] canal配置使用
[7] Mysql 普通账户授权replication client后登录失败问题
[8] REPLICATION SLAVE 与 REPLICATION CLIENT 权限
[9] 对replication slave,replication client的一点说明
[10] MySQL 5.6 Reference Manual – 6.2.1 Privileges Provided by MySQL
[11] SimpleCanalClientTest
[12] ClusterCanalClientTest

2,原理简介

  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
  • canal 解析 binary log 对象(原始为 byte 流)

(1)数据库主从同步

  • Mysql Master 首先将数据写入二进制文件(binary log),可以通过 show binlog events 进行查看
  • Mysql Slave 将Mysql 的 binary log events 拷贝到中间日志(relay log)
  • Mysql Slave 重放 relay log中事件,将数据变更反映他自己的数据
原理.png

(2)同步一致性问题

「MySQL的复制分为:异步复制、半同步复制、全同步复制。」

1),异步复制

MySQL默认的复制即是异步复制,主库在执行完客户端提交的事务后会立即将结果返给给客户端,并不关心从库是否已经接收并处理,这样就会有一个问题,「主如果crash掉了,此时主上已经提交的事务可能并没有传到从库上,如果此时,强行将从提升为主,可能导致新主上的数据不完整。」

2),全同步复制

指当主库执行完一个事务,所有的从库都执行了该事务才返回给客户端。「因为需要等待所有从库执行完该事务才能返回」,所以全同步复制的性能必然会收到严重的影响。

3),半同步复制

是介于全同步复制与全异步复制之间的一种,「主库只需要等待至少一个从库节点收到」并且 Flush Binlog 到 Relay Log 文件即可,主库不需要等待所有从库给主库反馈。同时,「这里只是一个收到的反馈,而不是已经完全完成并且提交的反馈」,如此,节省了很多时间。

(3)canal.admin、adapter和deployer的区别

canal.example

是demo工程

canal-admin

canal-admin设计上是为canal提供整体配置管理、节点运维等面向运维的功能,提供相对友好的WebUI操作界面,方便更多用户快速和安全的操作

canal-adapter

增加客户端数据落地的适配及启动功能(支持Sync HBase、Sync ES 等)

canal 1.1.1版本之后, 内置增加客户端数据同步功能,

canal-deployer

这个就相当于canal的服务端,启动它才可以在客户端接收数据库变更信息。

3,常见问题排查

(1)表结构元数据的存储问题(配置项里面使用了tsdb也就是时序数据库的字眼,下面就称为tsdb功能)。

  • 解决方案1:

    environment

    • canal 1.1.1
    • mysql 5.7

    Issue Description

    当前测试过修改表结构的情况(注意:h2.mv.db的文件所在路径为:CANAL_HOME/canal.deployer/conf/example/):
    1.表添加字段:需要重启canal,不重启同步失效;删除 h2.mv.db 后再重启,同步生效。
    2.表修改字段:不需要重启,只能同步未修改前的表结构数据(返回的还是未修改字段名的字段);删除 h2.mv.db 后再重启,修改的字段生效。
    3.删除字段不需要重启,也不需要删除 h2.mv.db,即时生效。

  • 解决方案2:

    默认开启tsdb功能,也就是会通过h2数据库缓存解析的表结构,但是实际情况下,如果上游变更了表结构,h2数据库对应的缓存是不会更新的,这个时候一般会出现神奇的解析异常,异常的信息一般如下:

    Caused by: com.alibaba.otter.canal.parse.exception.CanalParseException: column size is not match for table:数据库名称.表名称,新表结构的字段数量 vs 缓存表结构的字段数量;
    

    该异常还会导致一个可怕的后果:解析线程被阻塞,也就是binlog事件不会再接收和解析。这个问题笔者也查看过很多Issue,大家都认为是一个严重的BUG,目前认为比较可行的解决方案是:禁用tsdb功能(真的够粗暴),也就是canal.instance.tsdb.enable设置为false。如果不禁用tsdb功能,一旦出现了该问题,必须要先停止Canal服务,接着删除$CANAL_HOME/conf/目标数据库实例标识/h2.mv.db文件,然后启动Canal服务。

    因为这个比较坑的问题,笔者在生产禁用了tsdb功能,并且添加了DDL语句的处理逻辑,直接打到钉钉预警上并且@整个群的人。

(2)Mysql的数据binlog的格式必须是Row模式

Binlog解析错误:重复解析/DML解析为QUERY

  • INSERT/UPDATE/DELETE被解析为Query或DDL语句
  • Binlog重复解析,即一个操作又有QUERY消息,又有对应的INSERT/UPDATE/DELETE消息。

1),Row模式

Binlog日志中仅记录哪一条记录被修改了,修改成什么样了,会非常清楚的记录下每一行数据修改的细节,「Master修改了哪些行,slave也直接修改对应行的数据」

问1:INSERT/UPDATE/DELETE被解析为Query或DDL语句?

答1:出现这类情况主要原因为收到的binlog就为Query事件,比如:

  1. binlog格式为非row模式,通过show variables like 'binlog_format'可以查看. 针对statement/mixed模式,DML语句都会是以SQL语句存在
  2. mysql5.6+之后,在binlog为row模式下,针对DML语句通过一个开关(binlog-rows-query-log-events=true, show variables里也可以看到该变量),记录DML的原始SQL,对应binlog事件为RowsQueryLogEvent,同时也有对应的row记录. ps. canal可以通过properties设置来过滤:canal.instance.filter.query.dml = true

2),Statement模式

每一条会修改数据的sql都会记录到master的binlog中,「slave在复制的时候sql进程会解析成和原来master端执行相同的sql再执行。」

3),Mix模式

「在 Mixed 模式下,MySQL 会根据执行的每一条具体的 SQL 语句来区分对待记录的日志形式,也就是在 statement 和 row 之间选择一种。」

(3)Filter失效

配置方式有两种,一种是使用配置文件的方式;另一种是使用代码的方式指定;
使用代码的方式优先级要大于配置的方式;那么此时就会出现导致Filter失效。

  • 配置文件方式

    我们通常在canal-server端的conf/example/instance.properties文件中进行设置

mysql 数据解析关注的表,Perl正则表达式.
多个正则之间以逗号(,)分隔,转义符需要双斜杠(\\)
常见例子:
1.  所有表:.*   or  .*\\..*
2.  canal schema下所有表:canal\\..*
3.  canal下的以canal打头的表:canal\\.canal.*
4.  canal schema下的一张表:canal.test1
5.  多个规则组合使用:canal\\..*,mysql.test1,mysql.test2 (逗号分隔)
  • 使用代码的方式

    也可以在客户端与canal进行连接时,用客户端的connector.subscribe("xxxxxxx");来覆盖服务端初始化时的设置。

(4)消费落后

部署完Canal后,在遇到数据库写入高峰期,就遇到了数据延迟问题。数据延迟还是小事,但是一旦延迟到堆满了内存缓冲区,不消费的话,新的消息就进不来了。

具体的设计可以参考这个网址

解决方案:

「一个可行的解决办法是,将消息拉取后,写入消息队列(如RabbitMQ/Kafka),用消息队列来堆积消息处理,来保证大量消息堆积后不会导致canal卡死,并且可以支持数据持久化。」

(5),未配置MySQL-master连接信息

  • 报错信息
2021-12-12 20:11:04.251 [destination = example , address = null , EventParser] ERROR c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - parse events has an error
com.alibaba.otter.canal.parse.exception.CanalParseException: illegal connection is null
  • 修改配置文件信息,追加master的配置

    canal.instance.master.address = 127.0.0.1:3306

  • 校验meta,dat 元数据信息

[yunwei@host-192-124-16-73 example]# ls
h2.mv.db  h2.trace.db  instance.properties  meta.dat
[yunwei@host-192-124-16-73 example]# pwd
/app/canal.deployer/conf/example
[yunwei@host-192-124-16-73 example]# cat meta.dat 
{"clientDatas":[{"clientIdentity":{"clientId":1001,"destination":"example","filter":""},"cursor":{"identity":{"slaveId":-1,"sourceAddress":{"address":"192.124.16.90","port":3306}},"postion":{"gtid":"","included":false,"journalName":"mysql-bin.000008","position":4918,"serverId":1,"timestamp":1639312041000}}}],"destination":"example"}
  • 在数据库中进行对比查看
-- 查看binlog的日志存储
mysql> show binary logs;
+------------------+-----------+
| Log_name         | File_size |
+------------------+-----------+
| mysql-bin.000001 |       177 |
| mysql-bin.000002 |      1131 |
| mysql-bin.000003 |       792 |
| mysql-bin.000004 |  45420039 |
| mysql-bin.000005 |      2943 |
| mysql-bin.000006 |       862 |
| mysql-bin.000007 |      9082 |
| mysql-bin.000008 |      4949 |
+------------------+-----------+
8 rows in set (0.00 sec)

-- 查看当前已经消费到了什么位置 
mysql> show binlog events in 'mysql-bin.000008';
+------------------+------+----------------+-----------+-------------+---------------------------------------+
| Log_name         | Pos  | Event_type     | Server_id | End_log_pos | Info                                  |
+------------------+------+----------------+-----------+-------------+---------------------------------------+
| mysql-bin.000008 |    4 | Format_desc    |         1 |         123 | Server ver: 5.7.27-log, Binlog ver: 4 |
| mysql-bin.000008 |  123 | Previous_gtids |         1 |         154 |                                       |
| mysql-bin.000008 |  154 | Anonymous_Gtid |         1 |         219 | SET @@SESSION.GTID_NEXT= 'ANONYMOUS'  |
| mysql-bin.000008 |  219 | Query          |         1 |         291 | BEGIN                                 |
| mysql-bin.000008 |  291 | Table_map      |         1 |         402 | table_id: 110 (zhsq.t_vehicle_record) |
| mysql-bin.000008 |  402 | Write_rows     |         1 |         808 | table_id: 110 flags: STMT_END_F       |
| mysql-bin.000008 |  808 | Xid            |         1 |         839 | COMMIT /* xid=108 */                  |
| mysql-bin.000008 |  839 | Anonymous_Gtid |         1 |         904 | SET @@SESSION.GTID_NEXT= 'ANONYMOUS'  |
| mysql-bin.000008 |  904 | Query          |         1 |         976 | BEGIN                                 |
| mysql-bin.000008 |  976 | Table_map      |         1 |        1087 | table_id: 110 (zhsq.t_vehicle_record) |
| mysql-bin.000008 | 1087 | Write_rows     |         1 |        1493 | table_id: 110 flags: STMT_END_F       |
| mysql-bin.000008 | 1493 | Xid            |         1 |        1524 | COMMIT /* xid=115 */                  |
| mysql-bin.000008 | 1524 | Anonymous_Gtid |         1 |        1589 | SET @@SESSION.GTID_NEXT= 'ANONYMOUS'  |
| mysql-bin.000008 | 1589 | Query          |         1 |        1661 | BEGIN                                 |
| mysql-bin.000008 | 1661 | Table_map      |         1 |        1772 | table_id: 110 (zhsq.t_vehicle_record) |
| mysql-bin.000008 | 1772 | Write_rows     |         1 |        2178 | table_id: 110 flags: STMT_END_F       |
| mysql-bin.000008 | 2178 | Xid            |         1 |        2209 | COMMIT /* xid=123 */                  |
| mysql-bin.000008 | 2209 | Anonymous_Gtid |         1 |        2274 | SET @@SESSION.GTID_NEXT= 'ANONYMOUS'  |
| mysql-bin.000008 | 2274 | Query          |         1 |        2346 | BEGIN                                 |
| mysql-bin.000008 | 2346 | Table_map      |         1 |        2457 | table_id: 110 (zhsq.t_vehicle_record) |
| mysql-bin.000008 | 2457 | Write_rows     |         1 |        2863 | table_id: 110 flags: STMT_END_F       |
| mysql-bin.000008 | 2863 | Xid            |         1 |        2894 | COMMIT /* xid=158 */                  |
| mysql-bin.000008 | 2894 | Anonymous_Gtid |         1 |        2959 | SET @@SESSION.GTID_NEXT= 'ANONYMOUS'  |
| mysql-bin.000008 | 2959 | Query          |         1 |        3031 | BEGIN                                 |
| mysql-bin.000008 | 3031 | Table_map      |         1 |        3142 | table_id: 110 (zhsq.t_vehicle_record) |
| mysql-bin.000008 | 3142 | Write_rows     |         1 |        3548 | table_id: 110 flags: STMT_END_F       |
| mysql-bin.000008 | 3548 | Xid            |         1 |        3579 | COMMIT /* xid=192 */                  |
| mysql-bin.000008 | 3579 | Anonymous_Gtid |         1 |        3644 | SET @@SESSION.GTID_NEXT= 'ANONYMOUS'  |
| mysql-bin.000008 | 3644 | Query          |         1 |        3716 | BEGIN                                 |
| mysql-bin.000008 | 3716 | Table_map      |         1 |        3827 | table_id: 110 (zhsq.t_vehicle_record) |
| mysql-bin.000008 | 3827 | Write_rows     |         1 |        4233 | table_id: 110 flags: STMT_END_F       |
| mysql-bin.000008 | 4233 | Xid            |         1 |        4264 | COMMIT /* xid=215 */                  |
| mysql-bin.000008 | 4264 | Anonymous_Gtid |         1 |        4329 | SET @@SESSION.GTID_NEXT= 'ANONYMOUS'  |
| mysql-bin.000008 | 4329 | Query          |         1 |        4401 | BEGIN                                 |
| mysql-bin.000008 | 4401 | Table_map      |         1 |        4512 | table_id: 110 (zhsq.t_vehicle_record) |
| mysql-bin.000008 | 4512 | Write_rows     |         1 |        4918 | table_id: 110 flags: STMT_END_F       |
| mysql-bin.000008 | 4918 | Xid            |         1 |        4949 | COMMIT /* xid=234 */                  |
+------------------+------+----------------+-----------+-------------+---------------------------------------+
37 rows in set (0.00 sec)

参考连接

1,监听Mysql表的变化,使用Canal

2,【开源实战】Canal部署常见问题:重复解析/Filter失效/消费落后

3,Canal 1.1.4 避坑指南

4,修改表结构之后是否重启Canal

5,Canal笔记

6,配置项详细说明

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,752评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,100评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,244评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,099评论 1 286
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,210评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,307评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,346评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,133评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,546评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,849评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,019评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,702评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,331评论 3 319
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,030评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,260评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,871评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,898评论 2 351

推荐阅读更多精彩内容