第一章 Canal 服务端搭建
1.1 认识 Canal 框架
canal 官方开源github地址: canal ;下载地址:
canal 官方文档github地址:canal document
canal [kə'næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。
工作原理:
- canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送 dump 协议
- MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
- canal 解析 binary log 对象(原始为 byte 流)
1.2 搭建 Canal 服务框架
1.2.1 启动 MySQL 的 Binlog 功能
对于系统 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,修改MySQL配置文件my.cnf 或者 my.ini 中的配置,并重启 MySQL 服务;
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
1.2.2 启动 MySQL 的 Binlog 功能
授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
1.2.3 下载配置 canal 服务
下载 canal, 访问 release 页面 , 选择需要的包下载, 如以 1.1.6 版本为例
wget https://github.com/alibaba/canal/releases/download/canal-1.1.6/canal.deployer-1.1.6.tar.gz
或者直接选择文件下载
解压文件到指定文件夹 canal 下;
配置修改
配置文件路径:
canal\conf\example\instance.properties
## mysql serverId
canal.instance.mysql.slaveId = 1234
#position info,需要改成自己的数据库信息
canal.instance.master.address = 127.0.0.1:3306
canal.instance.master.journal.name =
canal.instance.master.position =
canal.instance.master.timestamp =
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#username/password,需要改成自己的数据库信息
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName =
canal.instance.connectionCharset = UTF-8
#table regex
canal.instance.filter.regex = .\*\\\\..\*
1.3 启动 Canal 框架
到 canal 文件下查找 bin 文件夹,startup.bat 就是启动文件。双击 startup.bat 文件即可启动 canal 框架;
如果启动界面无法启动,或者出现一闪而过,说明 canal 框架启动失败;可以在文件 startup.bat 所在文件夹下,启动 powershell,使用命令式执行 startup.bat 文件,可查看运行具体信息。如下所示:
1.4 问题解决
1.4.1 启动报错 Unrecognized VM option 'PermSize=128m'
# 错误描述信息
Unrecognized VM option 'PermSize=128m'
Error: Could not create the Java Virtual Machine.
Error: A fatal exception has occurred. Program will exit.
搜索查找资料大多数都是说JDK问题,需要安装JDK1.8,配置JAVA_HOME路径到JDK1.8就能解决;此方法,我没有做测试,无法得知是否是可以解决;
仔细查看报错信息,是JAVA启动虚拟机失败,无法识别 VM 参数,因此是启动参数有问题。
解决方法:此问题是canal启动参数有问题,修改启动参数即可解决。
打开canal启动文件:canal\bin\startup.bat
set JAVA_MEM_OPTS= -Xms128m -Xmx512m -XX:PermSize=128m
修改为
set JAVA_MEM_OPTS= -Xms128m -Xmx512m
启动canal查看运行情况:
1.4.2 客户端连接canal报错 Debugger failed to attach: timeout during handshake
[destination = example , address = /127.0.0.1:3306 , EventParser]
ERROR c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - dump address /127.0.0.1:3306 has an error, retrying.
caused by com.alibaba.otter.canal.parse.exception.CanalParseException:
java.io.IOException: connect /127.0.0.1:3306 failure
查找多数都是是Java远程调试参数配置问题等等;后来查看canal日志文件,找到问题根本,是canal连接MySQL数据失败导致;
解决方法:修改canal用户权限;
查看user数据库中canal用户信息,重新配置用户权限,并更新数据;
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
还有一种情况是用户密码方式有问题;myql8.0版本的密码加密方式为caching_sha2_password,所以修改为mysql_native_password 就行并重新更新密码。这也是我认为有很大可能的另一种情况;值得注意的是,我只测试了修改了用户权限,就解决问题,修改用户密码方式,没有测试,无法知晓是否解决
方法:
ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'canal'; #更新一下用户密码
FLUSH PRIVILEGES; #刷新权限
第二章 Canal 客户端搭建——Go语言库 canal-go
canal-go 库开源地址为:canal-go
你可以直接下载 canal-go 库,使用项目中已经有的例子 canal-go/samples/main.go
也可创建新的项目,导入 canal-go 库,以下就是第二个方式。步骤如下:
1、创建项目 dbcanal
2、go mod init
3、创建 main.go 脚本
4、将以下代码复制进去(代码在文章结尾。)
5、go mod tidy 下载依赖
6、go run main.go 执行程序查看结果
7、更改数据库中的数据,再查看结果
MySQL 语句自己按照自己的数据库进行修改,增删改语句都可以进行监听查看。
INSERT INTO test.meter_base_protocol
(name,age)
VALUES
("lilei",18);
到此,所有关于 Go 操作阿里开源的框架 Canal 搭建完成,后续就是按照项目所需要求进行代码开发了, 祝大家好运。
注意:代码
package main
import (
"fmt"
"github.com/golang/protobuf/proto"
"github.com/withlin/canal-go/client"
pbe "github.com/withlin/canal-go/protocol/entry"
"log"
"os"
"time"
)
func main() {
// 192.168.199.17 替换成你的canal server的地址
// example 替换成-e canal.destinations=example 你自己定义的名字
// 该字段名字在 canal\conf\example\meta.dat 文件中,NewSimpleCanalConnector函数参数配置,也在文件中
/**
NewSimpleCanalConnector 参数说明
client.NewSimpleCanalConnector("Canal服务端地址", "Canal服务端端口", "Canal服务端用户名", "Canal服务端密码", "Canal服务端destination", 60000, 60*60*1000)
Canal服务端地址:canal服务搭建地址IP
Canal服务端端口:canal\conf\canal.properties文件中
Canal服务端用户名、密码:canal\conf\example\instance.properties 文件中
Canal服务端destination :canal\conf\example\meta.dat 文件中
*/
connector := client.NewSimpleCanalConnector("127.0.0.1", 11111,
"canal", "canal", "example",
60000, 60*60*1000)
err := connector.Connect()
if err != nil {
log.Println(err)
os.Exit(1)
}
// https://github.com/alibaba/canal/wiki/AdminGuide
//mysql 数据解析关注的表,Perl正则表达式.
//
//多个正则之间以逗号(,)分隔,转义符需要双斜杠(\\)
//
//常见例子:
//
// 1. 所有表:.* or .*\\..*
// 2. canal schema下所有表: canal\\..*
// 3. canal下的以canal打头的表:canal\\.canal.*
// 4. canal schema下的一张表:canal\\.test1
// 5. 多个规则组合使用:canal\\..*,mysql.test1,mysql.test2 (逗号分隔)
err = connector.Subscribe(".*\\..*")
if err != nil {
log.Println(err)
os.Exit(1)
}
for {
message, err := connector.Get(100, nil, nil)
if err != nil {
log.Println(err)
os.Exit(1)
}
batchId := message.Id
if batchId == -1 || len(message.Entries) <= 0 {
time.Sleep(300 * time.Millisecond)
fmt.Println(time.Now().Format("2006-01-02 15:04:05"), "===没有数据了===")
continue
}
printEntry(message.Entries)
}
}
func printEntry(entrys []pbe.Entry) {
for _, entry := range entrys {
if entry.GetEntryType() == pbe.EntryType_TRANSACTIONBEGIN || entry.GetEntryType() == pbe.EntryType_TRANSACTIONEND {
continue
}
rowChange := new(pbe.RowChange)
err := proto.Unmarshal(entry.GetStoreValue(), rowChange)
checkError(err)
if rowChange != nil {
eventType := rowChange.GetEventType()
header := entry.GetHeader()
fmt.Println(fmt.Sprintf("================> binlog[%s : %d],name[%s,%s], eventType: %s", header.GetLogfileName(), header.GetLogfileOffset(), header.GetSchemaName(), header.GetTableName(), header.GetEventType()))
for _, rowData := range rowChange.GetRowDatas() {
if eventType == pbe.EventType_DELETE {
printColumn(rowData.GetBeforeColumns())
} else if eventType == pbe.EventType_INSERT {
printColumn(rowData.GetAfterColumns())
} else {
fmt.Println("-------> before")
printColumn(rowData.GetBeforeColumns())
fmt.Println("-------> after")
printColumn(rowData.GetAfterColumns())
}
}
}
}
}
func printColumn(columns []*pbe.Column) {
for _, col := range columns {
fmt.Println(fmt.Sprintf("%s : %s update= %t", col.GetName(), col.GetValue(), col.GetUpdated()))
}
}
func checkError(err error) {
if err != nil {
fmt.Fprintf(os.Stderr, "Fatal error: %s", err.Error())
os.Exit(1)
}
}