数据库中间件:Canal

(1)理论基础

canal是阿里巴巴旗下的一款开源项目,纯Java开发。基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了MySQL。canal 就是一个同步增量数据的一个工具。

canal 应用场景1

我们在做mysql与redis的数据同步时,往往采用的是代码层实现,或者通过spring-cache等缓存框架。但是仍然有某些场景,比如说原项目无源码,或者不能进行二开时,就需要独立的第三方来实现数据同步。我们需要一种无代码入侵式的数据同步,完全由第三方组件管理。这就需要借助canal来实现mysql到redis的数据同步

canal 应用场景2

将用户的订单信息传入后台。

后台服务器将订单信息保存到mysql数据库。

又 canal 进行监控mysql中的写操作变化,将发生修改(Insert) 的数据写入到kafka

通过sparkStreaming读取Kafka中的数据,进行计算。

将计算好的结果,重新写入到服务器中,并返回到浏览器。

canal的作用

它可以实现增量同步,拿A商品举例,第一批数据中,A商品有100条,canal便会将这批新增的数据写入Kafka,再交给spark处理。第二次又新增一批数据,于是canal又将监控到新增数据写入到Kafka中。依次类推,最终由spark计算出结果返回出去。

canal 除了写入kafka 还能将数据写入到其他中间件(mq、elasticsearch、hbase等)

canal 工作原理

canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送 dump 协议

MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )

canal 解析 binary log 对象(原始为 byte 流)

(2)环境配置

1.【mysql准备】

使用命令查看数据库是否开启binlog模式:

log_bin属性值为ON,则binlog模式开启;为OFF则binlog模式关闭。

show variables like 'log_%';

2.【canal解压】

canal.deployer-1.1.6.tar.gz

3. 【canal配置】

conf\example\instance.properties

canal.instance.master.address=127.0.0.1:3306

canal.instance.dbUsername=root

canal.instance.dbPassword=123456

canal.instance.filter.regex=.*\\..*

conf\canal.properties

canal.port = 11111

canal.destinations = example

4. 【Mysql读取位置】

show master status;

reset master;

特别注意:读取位置position必须和meta.dat中一致

5.【启动canal】

bat

6. 【查看日志】

logs\example\example.log

(3)项目应用

1. 【pom.xml】

        <dependency>

            <groupId>top.javatool</groupId>

            <artifactId>canal-spring-boot-starter</artifactId>

            <version>1.2.1-RELEASE</version>

        </dependency>

2. 【application.yml】

canal:

  server: 127.0.0.1:11111

  destination: example

3. 【Canal开发】

EntryHandler<T>接口实现

// 说明:监听表中数据的变化【添加、修改、删除】、自动调用对应的方法

@Component

@CanalTable("person")

public class PersonHandler implements EntryHandler<Person> {

    @Override

    public void insert(Person person) {

        System.out.println("表中添加了数据");

        System.out.println(person);

    }

    @Override

    public void update(Person before, Person after) {

        System.out.println("表中修改了数据");

        System.out.println(before);

        System.out.println(after);

    }

    @Override

    public void delete(Person person) {

        System.out.println("表中删除了数据");

        System.out.println(person);

    }

}

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容