canal 基于Mysql数据库增量日志解析

canal 基于Mysql数据库增量日志解析

 1.前言

 最近太多事情 工作的事情,以及终身大事等等 耽误更新,由于最近做项目需要同步监听 未来电视 mysql的变更了解到公司会用canal做增量监听,就尝试使用了一下 这里做个demo 简单的记录一下。

 2.canal简介

 canal:主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费的中间件
 当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

Xnip20200117_145806.png

 3.MySQL 注备复制原理

Xnip20200117_150056.png

  3.1 mysql主备复制工作原理

  1.MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
  2.MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
  3.MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据

  3.2 canal 工作原理

  1.canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
  2.MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
  3.canal 解析 binary log 对象(原始为 byte 流)

 4.准备

 对于自建MySQL ,需要先开启 Binlog写入功能,并且配置binlog-format 为Row模式 在my.cnf中配置

Xnip20200117_151803.png

 授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

 5.canal 下载安装配置

  5.1 canal下载

  canal 下载地址 (下载速度可能很慢)

  下载 canal.deployer-xxx.tar.gz 如 canal.deployer-1.1.4.tar.gz

  解压后 可以看到如下结构


Xnip20200117_150905.png

  5.2 canal 初始配置

  配置修改:

vim conf/example/instance.properties

  如下:

#################################################
## mysql serverId
canal.instance.mysql.slaveId = 2020

# position info 修改自己的数据库(canal要监听的数据库 地址 )
canal.instance.master.address = 127.0.0.1:3306
canal.instance.master.journal.name = 
canal.instance.master.position = 
canal.instance.master.timestamp = 

#canal.instance.standby.address = 
#canal.instance.standby.journal.name =
#canal.instance.standby.position = 
#canal.instance.standby.timestamp = 

# username/password 修改成自己 数据库信息的账号 (单独开一个 准备阶段创建的账号)
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName =
canal.instance.connectionCharset = UTF-8

# table regex  表的监听规则 
# canal.instance.filter.regex = blogs\.blog_info  
canal.instance.filter.regex = .\*\\\\..\*
# table black regex
canal.instance.filter.black.regex = 

  启动canal

sh bin/startup.sh

  查看server日志
  看到 the canal server is running now 表示启动成功

vi logs/canal/canal.log


2020-01-08 15:25:33.361 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ##    start the canal server.
2020-01-08 15:25:33.468 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[192.168.12.245:11111]
2020-01-08 15:25:34.061 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......

  查看instance的日志

vi logs/example/example.log

2020-01-08 15:25:33.864 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-blogs 
2020-01-08 15:25:33.998 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start successful....
2020-01-08 15:25:33.999 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN  c.a.otter.canal.parse.inbound.mysql.MysqlEventParser - prepare to find start position just show master status

  5.3 扩展 destination 配置

vi conf/canal.properties

  在canal.destinations 处可以配置当前server上部署的instance 列表 默认为 example ,我这里改成了 blogs最好对应数据库名称。一个instance 对应一个 数据库

Xnip20200117_153243.png
Xnip20200117_153644.png

 6.创建Java 客户端 监听canal 消费数据

  6.1 创建maven项目

  6.2 添加canal client POM 依赖

<dependency>
  <groupId>com.alibaba.otter</groupId>
  <artifactId>canal.client</artifactId>
  <version>1.1.0</version>
</dependency>

  6.3 创建 canal 的客户端监听

  CanalMessageListener.java

  该类实现InitializingBean 主要是在初始化的时候 执行 init 方法,在init()方法中 创建 CanalConnector对象,连接需要监听的canal,主要提供 canal的 host ,port ,destination ,以及username 和 password

  parse 方法 主要用于将监听的对象 通过反射等转换成对应的实体类

/**
* @author johnny
**/
@Component
@Slf4j
@ConditionalOnProperty(name = "application.canal.accessor", havingValue = "canal")
public class CanalMessageListener implements InitializingBean, ParseCanal {


private CanalConnector connector;

@Autowired
private CanalConfig canalConfig;

@Autowired
private IParseDispatcher configParseDispatcher;

private void init() {
    //创建canal 监听 传入host port destination等参数
    connector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalConfig.getHost(), canalConfig.getPort()),
            canalConfig.getDestination(), canalConfig.getUsername(), canalConfig.getPassword());
    connector.connect();
    //  .*\..*
    connector.subscribe(".*\\..*");
    connector.rollback();

    new Thread(() -> {
    
        while (true) {
            Message message = connector.getWithoutAck(canalConfig.getBatchSize());
            long batchId = message.getId();
            long size = message.getEntries().size();
        //batchId == -1 表示没有数据变更
            if (batchId == -1 || size == 0) {
                System.out.println("empty data ");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                }
            } else {
            //解析数据变更
                resoleveEntry(message.getEntries());
            }
        }

    }).start();

}
//解析数据变更
private void resoleveEntry(List<CanalEntry.Entry> entries) {
    CanalEntry.RowChange rowChange = null;
    for (CanalEntry.Entry row : entries) {
     //判断是否是 事物开始 和 事物结束 
        if (row.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || row.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
            continue;
        }
        try {
            rowChange = CanalEntry.RowChange.parseFrom(row.getStoreValue());
        } catch (InvalidProtocolBufferException e) {
            e.printStackTrace();
        }

        List<CanalEntry.RowData> rowDataList = rowChange.getRowDatasList();
        String tableName = row.getHeader().getTableName();
        CanalEntry.EventType eventType = row.getHeader().getEventType();

        for (CanalEntry.RowData rowData : rowDataList) {
            if (eventType == CanalEntry.EventType.UPDATE) {
                List<CanalEntry.Column> columns = rowData.getBeforeColumnsList();
                Object object = parse(columns, tableName);
                log.info("收到的 object:{}", JsonUtils.marshalToString(object));
                //根据收到的对象 处理后续业务逻辑
            }
        }

    }
}

@Override
public void afterPropertiesSet() throws Exception {
    init();
}

//解析 List<CanalEntry.Column>对象到对应的 实体类
@Override
public Object parse(List<CanalEntry.Column> canalDatas, String tableName) {
//根据配置好的map 从中根据key 表名 获取对应的映射后的 实体类class
    String className = configParseDispatcher.dispatch(tableName);
    Object entity = null;
    Class c = null;
    try {
        c = Class.forName(className);
        entity = c.newInstance();
    } catch (ClassNotFoundException e) {
        log.error("【未找到对应 {} 的 实体类 】", className);
    } catch (Exception e) {
    }

    for (CanalEntry.Column canalDataColumn : canalDatas) {
        String columnName = canalDataColumn.getName();
        Field[] fields = c.getDeclaredFields();

        for (Field field : fields) {
            Object fieldValue = null;
            field.setAccessible(true);
            String fiedName = CaseFormat.LOWER_CAMEL.to(CaseFormat.LOWER_UNDERSCORE, field.getName());
            log.info("【filedName: {}】", fiedName);
            if (fiedName.equals(columnName)) {
                try {
                    if (Long.class.equals(field.getType())) {
                        fieldValue = NumberUtils.toLong(canalDataColumn.getValue());
                    }else if(Integer.class.equals(field.getType())){
                        fieldValue = NumberUtils.toInt(canalDataColumn.getValue());
                    }else if(Double.class.equals(field.getType())){
                        fieldValue = NumberUtils.toDouble(canalDataColumn.getValue());
                    }else if(Date.class.equals(field.getType())){
                        try {
                            fieldValue = DateUtils.parseDate(canalDataColumn.getValue(), new String[]{"yyyy-MM-dd HH:mm:ss"});
                        } catch (ParseException e) {
                            e.printStackTrace();
                        }
                    }else{
                        fieldValue = canalDataColumn.getValue();
                    }
                    field.set(entity, fieldValue);
                    break;
                } catch (IllegalAccessException e) {
                    e.printStackTrace();
                }
            }
        }

    }
    return entity;
}
}

  application.yml
  配置canal 地址,以及表名和实体的映射规则

server:
port: 8881



application:
  canal:
    accessor: canal
    host: 127.0.0.1
    port: 11111
    username:
    password:
    destination: blogs
    batchSize: 30

    parse:   规则,根据表名获取对应要映射的 实体class
      rule:
        mapping:
          blog_info: com.johnny.canal.canal_test.entity.BlogInfo

  IParseDispatcher.java
  接口:用来根据表名key获取对应的 要映射的实体,这里写成接口是因为可以提供多种获取方式,比如我这里通过yml 配置去获取

/**
* @author johnny
* @create 2020-01-17 上午11:09
**/
public interface IParseDispatcher {

 String dispatch(String key);

}

  ConfigParseDispatcher.java
  实现上面的接口,提供一种从 application.yml 获取初始源配置 根据 application.canal.parse.rule进行配置

/**
* @author johnny
* @create 2020-01-17 上午11:07
**/
@Data
@Configuration
@ConfigurationProperties(prefix = "application.canal.parse.rule")
public class ConfigParseDispatcher implements IParseDispatcher {

private Map<String,String> mapping=new HashMap<>();

@Override
public String dispatch(String key) {
    return mapping.get(key);
}

}

  7.演示

  启动项目 此时控制台打印 empty data ,无数据变更

Xnip20200117_160125.png

  通过执行 在 canal监听的mysql 上执行 更新语句

update blog_info set blog_title = 'SpringBoot配置相关for canal test '  where id = 40

  debug 程序,当执行上面的update语句后 可以看到立即收到


Xnip20200117_160552.png

  通过parse方法解析为对应的 实体对象,后续做自己的业务逻辑 即可

Xnip20200117_160718.png

 8.总结

本篇主要介绍了canal是什么,如何下载安装和配置 ,以及提供了自己写的一个简单demo 。后续有机会深入了解一下canal的其他功能,比如 如何同步到Kafka/RocketMQ等等。。

个人博客网站 https://www.askajohnny.com 欢迎来访问!
本文由博客一文多发平台 OpenWrite 发布!

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,163评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,301评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,089评论 0 352
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,093评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,110评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,079评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,005评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,840评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,278评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,497评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,667评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,394评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,980评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,628评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,796评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,649评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,548评论 2 352

推荐阅读更多精彩内容