Go操作阿里开源的框架Canal用于MySQL实时binlog同步

第一章 Canal 服务端搭建

1.1 认识 Canal 框架

canal 官方开源github地址: canal ;下载地址:
canal 官方文档github地址:canal document
canal [kə'næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。

工作原理:

  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送 dump 协议
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
  • canal 解析 binary log 对象(原始为 byte 流)

1.2 搭建 Canal 服务框架

1.2.1 启动 MySQL 的 Binlog 功能

对于系统 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,修改MySQL配置文件my.cnf 或者 my.ini 中的配置,并重启 MySQL 服务;

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

1.2.2 启动 MySQL 的 Binlog 功能

授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

1.2.3 下载配置 canal 服务

下载 canal, 访问 release 页面 , 选择需要的包下载, 如以 1.1.6 版本为例

wget https://github.com/alibaba/canal/releases/download/canal-1.1.6/canal.deployer-1.1.6.tar.gz

或者直接选择文件下载


image.png

解压文件到指定文件夹 canal 下;
配置修改

配置文件路径:
canal\conf\example\instance.properties
## mysql serverId
canal.instance.mysql.slaveId = 1234
#position info,需要改成自己的数据库信息
canal.instance.master.address = 127.0.0.1:3306 
canal.instance.master.journal.name = 
canal.instance.master.position = 
canal.instance.master.timestamp = 
#canal.instance.standby.address = 
#canal.instance.standby.journal.name =
#canal.instance.standby.position = 
#canal.instance.standby.timestamp = 
#username/password,需要改成自己的数据库信息
canal.instance.dbUsername = canal  
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName =
canal.instance.connectionCharset = UTF-8
#table regex
canal.instance.filter.regex = .\*\\\\..\*

1.3 启动 Canal 框架

到 canal 文件下查找 bin 文件夹,startup.bat 就是启动文件。双击 startup.bat 文件即可启动 canal 框架;


image.png

如果启动界面无法启动,或者出现一闪而过,说明 canal 框架启动失败;可以在文件 startup.bat 所在文件夹下,启动 powershell,使用命令式执行 startup.bat 文件,可查看运行具体信息。如下所示:


image.png

1.4 问题解决

1.4.1 启动报错 Unrecognized VM option 'PermSize=128m'

# 错误描述信息
Unrecognized VM option 'PermSize=128m'
Error: Could not create the Java Virtual Machine.
Error: A fatal exception has occurred. Program will exit.
image.png

搜索查找资料大多数都是说JDK问题,需要安装JDK1.8,配置JAVA_HOME路径到JDK1.8就能解决;此方法,我没有做测试,无法得知是否是可以解决;
仔细查看报错信息,是JAVA启动虚拟机失败,无法识别 VM 参数,因此是启动参数有问题。
解决方法:此问题是canal启动参数有问题,修改启动参数即可解决。

打开canal启动文件:canal\bin\startup.bat
set JAVA_MEM_OPTS= -Xms128m -Xmx512m -XX:PermSize=128m 
修改为
set JAVA_MEM_OPTS= -Xms128m -Xmx512m 

启动canal查看运行情况:


image.png

1.4.2 客户端连接canal报错 Debugger failed to attach: timeout during handshake

[destination = example , address = /127.0.0.1:3306 , EventParser] 
ERROR c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - dump address /127.0.0.1:3306 has an error, retrying.
caused by com.alibaba.otter.canal.parse.exception.CanalParseException: 
java.io.IOException: connect /127.0.0.1:3306 failure

查找多数都是是Java远程调试参数配置问题等等;后来查看canal日志文件,找到问题根本,是canal连接MySQL数据失败导致;
解决方法:修改canal用户权限;

查看user数据库中canal用户信息,重新配置用户权限,并更新数据;
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

还有一种情况是用户密码方式有问题;myql8.0版本的密码加密方式为caching_sha2_password,所以修改为mysql_native_password 就行并重新更新密码。这也是我认为有很大可能的另一种情况;值得注意的是,我只测试了修改了用户权限,就解决问题,修改用户密码方式,没有测试,无法知晓是否解决

方法:
ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'canal'; #更新一下用户密码

FLUSH PRIVILEGES; #刷新权限

第二章 Canal 客户端搭建——Go语言库 canal-go

canal-go 库开源地址为:canal-go
你可以直接下载 canal-go 库,使用项目中已经有的例子 canal-go/samples/main.go
也可创建新的项目,导入 canal-go 库,以下就是第二个方式。步骤如下:

1、创建项目 dbcanal

2、go mod init

3、创建 main.go 脚本

4、将以下代码复制进去(代码在文章结尾。)

5、go mod tidy 下载依赖

6、go run main.go 执行程序查看结果

image.png

7、更改数据库中的数据,再查看结果

MySQL 语句自己按照自己的数据库进行修改,增删改语句都可以进行监听查看。
INSERT INTO test.meter_base_protocol 
(name,age)
VALUES 
("lilei",18);
image.png

image.png

到此,所有关于 Go 操作阿里开源的框架 Canal 搭建完成,后续就是按照项目所需要求进行代码开发了, 祝大家好运。
注意:代码

package main

import (
    "fmt"
    "github.com/golang/protobuf/proto"
    "github.com/withlin/canal-go/client"
    pbe "github.com/withlin/canal-go/protocol/entry"
    "log"
    "os"
    "time"
)

func main() {

    // 192.168.199.17 替换成你的canal server的地址
    // example 替换成-e canal.destinations=example 你自己定义的名字
    //  该字段名字在 canal\conf\example\meta.dat 文件中,NewSimpleCanalConnector函数参数配置,也在文件中
    /**
      NewSimpleCanalConnector 参数说明
        client.NewSimpleCanalConnector("Canal服务端地址", "Canal服务端端口", "Canal服务端用户名", "Canal服务端密码", "Canal服务端destination", 60000, 60*60*1000)
        Canal服务端地址:canal服务搭建地址IP
        Canal服务端端口:canal\conf\canal.properties文件中
        Canal服务端用户名、密码:canal\conf\example\instance.properties 文件中
        Canal服务端destination :canal\conf\example\meta.dat 文件中
    */
    connector := client.NewSimpleCanalConnector("127.0.0.1", 11111,
        "canal", "canal", "example",
        60000, 60*60*1000)
    err := connector.Connect()
    if err != nil {
        log.Println(err)
        os.Exit(1)
    }

    // https://github.com/alibaba/canal/wiki/AdminGuide
    //mysql 数据解析关注的表,Perl正则表达式.
    //
    //多个正则之间以逗号(,)分隔,转义符需要双斜杠(\\)
    //
    //常见例子:
    //
    //  1.  所有表:.*   or  .*\\..*
    //  2.  canal schema下所有表: canal\\..*
    //  3.  canal下的以canal打头的表:canal\\.canal.*
    //  4.  canal schema下的一张表:canal\\.test1
    //  5.  多个规则组合使用:canal\\..*,mysql.test1,mysql.test2 (逗号分隔)

    err = connector.Subscribe(".*\\..*")
    if err != nil {
        log.Println(err)
        os.Exit(1)
    }

    for {

        message, err := connector.Get(100, nil, nil)
        if err != nil {
            log.Println(err)
            os.Exit(1)
        }
        batchId := message.Id
        if batchId == -1 || len(message.Entries) <= 0 {
            time.Sleep(300 * time.Millisecond)
            fmt.Println(time.Now().Format("2006-01-02 15:04:05"), "===没有数据了===")
            continue
        }

        printEntry(message.Entries)

    }
}

func printEntry(entrys []pbe.Entry) {

    for _, entry := range entrys {
        if entry.GetEntryType() == pbe.EntryType_TRANSACTIONBEGIN || entry.GetEntryType() == pbe.EntryType_TRANSACTIONEND {
            continue
        }
        rowChange := new(pbe.RowChange)

        err := proto.Unmarshal(entry.GetStoreValue(), rowChange)
        checkError(err)
        if rowChange != nil {
            eventType := rowChange.GetEventType()
            header := entry.GetHeader()
            fmt.Println(fmt.Sprintf("================> binlog[%s : %d],name[%s,%s], eventType: %s", header.GetLogfileName(), header.GetLogfileOffset(), header.GetSchemaName(), header.GetTableName(), header.GetEventType()))

            for _, rowData := range rowChange.GetRowDatas() {
                if eventType == pbe.EventType_DELETE {
                    printColumn(rowData.GetBeforeColumns())
                } else if eventType == pbe.EventType_INSERT {
                    printColumn(rowData.GetAfterColumns())
                } else {
                    fmt.Println("-------> before")
                    printColumn(rowData.GetBeforeColumns())
                    fmt.Println("-------> after")
                    printColumn(rowData.GetAfterColumns())
                }
            }
        }
    }
}

func printColumn(columns []*pbe.Column) {
    for _, col := range columns {
        fmt.Println(fmt.Sprintf("%s : %s  update= %t", col.GetName(), col.GetValue(), col.GetUpdated()))
    }
}

func checkError(err error) {
    if err != nil {
        fmt.Fprintf(os.Stderr, "Fatal error: %s", err.Error())
        os.Exit(1)
    }
}

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,951评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,606评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,601评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,478评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,565评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,587评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,590评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,337评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,785评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,096评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,273评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,935评论 5 339
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,578评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,199评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,440评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,163评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,133评论 2 352