Canal实现MySQL到Elasticsearch的高效数据同步

什么是Canal

Canal 是阿里巴巴开源的一款基于 MySQL 数据库增量日志解析的数据同步工具,主要功能是提供增量数据订阅和消费

Canal 工作原理

  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
  • canal 解析 binary log 对象(原始为 byte 流)
image.png

MySQL主备复制原理

  • MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
  • MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
  • MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据


    46ec982fc5daaa5ed75c3cc10124efb7.png

实现流程

一、MySQL 配置

1、修改 MySQL 配置文件

MySQL的配置文件名称取决于操作系统:在Windows系统中是my.ini,而在Linux、macOS等类Unix系统中是my.cnf

  • Windows系统:通常位于 C:\ProgramData\MySQL\MySQL Server 8.0\my.ini(路径可能因版本不同而变化)
  • Linux系统:常见路径包括 /etc/my.cnf 或 /etc/mysql/my.cnf

以下参数可能已存在,存在则不需配置,一般只需要配置 binlog-format=ROW

[mysqld]
# 开启 binlog 日志功能,用于记录数据库的所有变更操作
# 注意:在较新版本的 MySQL 中默认可能已开启,但文件名可能不是 mysql-bin
log-bin=mysql-bin

# 设置 binlog 格式为 ROW 模式,记录每一行数据的具体变化
# 注意:默认格式通常是 STATEMENT 或 MIXED,需要显式配置为 ROW
binlog-format=ROW

# 配置 MySQL 服务器 ID,用于主从复制标识
# 注意:需保证唯一性,且不能与 canal 客户端的 slaveId 冲突
server_id=1

修改配置后需要重启MySQL服务!!!

检查配置是否成功

SHOW VARIABLES LIKE 'log_bin';  -- 必须 ON
SHOW VARIABLES LIKE 'binlog_format'; -- 必须 ROW

2、新增MySQL账号密码

授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限
Canal 配置文件中默认MySQL授权的账号密码是 canal/canal

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

二、Canal 配置

1、下载Canal工具 ,点击跳转即可

点击下载地址,选择版本后点击canal.deployer文件下载,下载完后解压到你自定义的canal目录中即可


image.png

命令方式下载:

mkdir -p /opt/canal
cd /opt/canal
wget https://github.com/alibaba/canal/releases/download/canal-1.1.8/canal.deployer-1.1.8.tar.gz
tar -zxvf canal.deployer-1.1.8.tar.gz

Canal有两个重要的配置文件:

// instance.properties中的配置项优先级高于canal.properties中的全局配置
// 而通过代码API传入的参数具有最高的优先级,会直接覆盖配置文件中的值
canal/conf/example/instance.properties
canal/conf/canal.properties

canal.properties 配置文件一般保持不变即可

# canal端口
canal.port=11111
# canal实例名称
canal.destinations=example

conf/example/instance.properties配置文件修改:

# mysql serverId,不要和 mysql 的 server_id 重复
canal.instance.mysql.slaveId=10
# master address,需要改成自己的数据库IP端口
canal.instance.master.address=127.0.0.1:3306 
# username/password,需要改成自己的数据库账号密码
# Canal进行连接时有传递数据库账号密码的,可以不用管此配置
canal.instance.dbUsername=canal 
canal.instance.dbPassword=canal

2、启动和关闭

#进入文件目录下的bin文件夹
#启动(window环境下直接点击 startup.bat 脚本即可启动)
sh startup.sh
#关闭
sh stop.sh

三、Springboot集成Canal

1、引入Canal依赖

 <!-- canal客户端依赖,用于连接和消费MySQL binlog数据 -->
            <dependency>
                <groupId>com.alibaba.otter</groupId>
                <artifactId>canal.client</artifactId>
                <version>1.1.8</version>
            </dependency>

            <!-- canal协议依赖,提供binlog解析和传输协议支持 -->
            <dependency>
                <groupId>com.alibaba.otter</groupId>
                <artifactId>canal.protocol</artifactId>
                <version>1.1.8</version>
            </dependency>

2、在 application.properties 配置文件中增加Canal配置参数

# ======================
# 配置canal
# ======================
# Canal服务器地址
canal.instance.host=127.0.0.1
# Canal服务器端口号
canal.instance.port=11111
# Canal实例名称,需要与Canal Server中的配置保持一致
canal.instance.destination=example
# Canal服务器用户名,用于连接数据库
canal.instance.dbUsername=canal
# Canal服务器密码,用于连接数据库
canal.instance.dbPassword=canal
# Canal客户端每次获取数据的批量大小,默认值为1000
canal.instance.batchSize=1000
# Canal客户端重连最大重试次数
canal.instance.maxRetries=5
# Canal客户端每次重连的时间间隔(毫秒)
canal.instance.retryInterval=60000



# canal处理FileInfo的数据库
canal.process.fileInfo.databaseName=point_general
# canal处理FileInfo的表名
canal.process.fileInfo.tableName=file_info

3、Canal 客户端实现

@Slf4j
@Component
public class CanalClient {

    @Value("${canal.instance.host:127.0.0.1}")
    private String host;

    @Value("${canal.instance.port:11111}")
    private int port;

    @Value("${canal.instance.destination:example}")
    private String destination;

    @Value("${canal.instance.dbUsername:canal}")
    private String dbUsername;

    @Value("${canal.instance.dbPassword:canal}")
    private String dbPassword;

    @Value("${canal.instance.batchSize:1000}")
    private int batchSize;

    @Value("${canal.instance.maxRetries:5}")
    private int maxRetries;

    @Value("${canal.instance.retryInterval:60000}")
    private int retryInterval;

    @Value("${canal.process.fileInfo.databaseName:point_general}")
    private String fileInfoDatabaseName;

    @Value("${canal.process.fileInfo.tableName:file_info}")
    private String fileInfoTableName;

    @Resource
    private FileInfoService fileInfoService;

    private volatile boolean running = false;
    private Thread canalThread;

    @PostConstruct
    public void start() {
        log.info("创建并启动Canal客户端线程");
        running = true;
        canalThread = new Thread(this::run);
        canalThread.setName("canal-client-thread");
        canalThread.setDaemon(false);
        canalThread.start();
    }

    public void stop() {
        log.info("停止Canal客户端线程");
        running = false;
        if (canalThread != null) {
            canalThread.interrupt();
        }
    }

    @PreDestroy
    public void destroy() {
        stop();
    }

    /**
     * 运行Canal客户端
     */
    public void run() {
        CanalConnector connector = null;
        try {
            // 创建连接器
            connector = reconnect();
            log.info("Canal客户端连接成功!!!");
            // 判断是否运行
            while (running && !Thread.currentThread().isInterrupted()) {
                Message message = connector.getWithoutAck(batchSize);
                long messageId = message.getId();
                int size = message.getEntries().size();

                if (messageId == -1 || size == 0) {
                    // 未获取到数据,短暂休眠以降低轮询频率
                    Thread.sleep(1000);
                    continue;
                }

                // 处理数据变更事件
                processMysqlDataChange(message.getEntries());
                // 确认处理完成
                connector.ack(messageId);
            }
        } catch (InterruptedException e) {
            log.info("Canal客户端线程被中断");
        } catch (Exception e) {
            log.error("Canal客户端线程异常", e);
        } finally {
            running = retryWithLimit(maxRetries, retryInterval, this::reconnect);
            if (!running && connector != null) {
                connector.disconnect();
            }
        }
    }

    /**
     * Canal连接服务
     */
    private CanalConnector reconnect() {
        // 创建连接器 参数: 服务器地址、实例名称、数据库用户名、数据库密码
        // 此数据库账号密码会覆盖掉instance.properties配置文件中的canal.instance.dbUsername和canal.instance.dbPassword的值
        CanalConnector connector = CanalConnectors.newSingleConnector(
                new InetSocketAddress(host, port), destination, dbUsername, dbPassword);
        connector.connect();
        connector.subscribe(".*\\..*");
        connector.rollback();
        return connector;
    }

    /**
     * Canal客户端重连操作
     *
     * @param maxRetries    最大重试次数
     * @param retryInterval 重试间隔(毫秒)
     * @param operation     需要执行的操作
     * @return 是否执行成功
     */
    public boolean retryWithLimit(int maxRetries, long retryInterval, Runnable operation) {
        int retryCount = 0;
        while (retryCount < maxRetries) {
            try {
                operation.run();
                log.info("Canal客户端重连成功!!!");
                return true; // 执行成功,返回
            } catch (Exception e) {
                retryCount++;
                if (retryCount >= maxRetries) {
                    log.error("重试{}次后仍失败", retryCount, e);
                    return false;
                }

                try {
                    Thread.sleep(retryInterval);
                } catch (InterruptedException ie) {
                    Thread.currentThread().interrupt();
                    return false;
                }
            }
        }

        return false;
    }

    /**
     * 处理MySQL数据变更事件
     *
     * @param entries 从Canal获取的数据变更事件列表
     */
    public void processMysqlDataChange(List<CanalEntry.Entry> entries) {
        for (CanalEntry.Entry entry : entries) {
            try {
                // 跳过事务开始、结束、心跳、GTID事件
                if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN ||
                        entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND ||
                        entry.getEntryType() == CanalEntry.EntryType.HEARTBEAT ||
                        entry.getEntryType() == CanalEntry.EntryType.GTIDLOG) {
                    continue;
                }

                String databaseName = entry.getHeader().getSchemaName();
                if (StringUtils.isBlank(databaseName)) {
                    log.error("数据变更事件中未获取到数据库名称");
                    continue;
                }

                String tableName = entry.getHeader().getTableName();
                if (StringUtils.isBlank(tableName)) {
                    log.error("数据变更事件中未获取到表名称");
                    continue;
                }

                // 打印基本信息
                log.info("数据库: {}, 表名: {}, 操作类型:{}", databaseName, tableName, entry.getEntryType());

                // 处理数据行, 类型判断是防止官方后续增加其他类型
                if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {
                    // 处理文件上传数据同步到ES
                    if (Objects.equals(databaseName, fileInfoDatabaseName) && Objects.equals(tableName, fileInfoTableName)) {
                        try {
                            fileInfoService.syncToFileInfoEs(entry);
                        } catch (Exception e) {
                            log.error("同步 file info to ES failed, entry:{}", JSONUtil.toJsonStr(entry), e);
                            // 可以考虑实现重试机制或者发送告警
                        }
                    }
                }
            } catch (Exception e) {
                log.error("处理单个entry时发生异常: {}", JSONUtil.toJsonStr(entry), e);
                // 继续处理下一个entry,避免影响整个批次
            }
        }
    }
}
public interface FileInfoService extends IService<FileInfo> {

    /**
     * 同步文件信息到Elasticsearch
     *
     * @param entry CanalEntry对象,包含文件信息
     */
    void syncToFileInfoEs(CanalEntry.Entry entry) throws Exception;
}
@Service
public class FileInfoServiceImpl implements FileInfoService {
    @Resource
    private ElasticsearchRestTemplate elasticsearchRestTemplate;
    
    @Override
    public void syncToFileInfoEs(CanalEntry.Entry entry) throws Exception {
        ByteString storeValue = entry.getStoreValue();
        CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);
        CanalEntry.EventType eventType = rowChange.getEventType();
        
        if (eventType == CanalEntry.EventType.INSERT) {
            List<FileInfo> fileInfos = new ArrayList<>();
            for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                fileInfos.add(createFileInfo(rowData.getAfterColumnsList()));
            }
            elasticsearchRestTemplate.save(fileInfos);
        } else if (eventType == CanalEntry.EventType.UPDATE) {
            List<FileInfo> afterFileInfoList = new ArrayList<>();
            for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                afterFileInfoList.add(createFileInfo(rowData.getAfterColumnsList()));
            }
            elasticsearchRestTemplate.save(afterFileInfoList);
        } else if (eventType == CanalEntry.EventType.DELETE) {
            List<String> idList = new ArrayList<>();
            for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                for (CanalEntry.Column column : rowData.getBeforeColumnsList()) {
                    if ("id".equals(column.getName())) {
                        idList.add(column.getValue());
                        break;
                    }
                }
            }
            if (!idList.isEmpty()) {
                for (String id : idList) {
                    elasticsearchRestTemplate.delete(id, FileInfo.class);
                }
            }
        }
    }
    
    private FileInfo createFileInfo(List<CanalEntry.Column> columnList) {
        FileInfo fileInfo = new FileInfo();
        for (CanalEntry.Column column : columnList) {
            String name = column.getName();
            String value = column.getValue();
            
            switch (column.getName()) {
                case FileInfoIndexField.ID:
                    fileInfo.setId(Long.valueOf(column.getValue()));
                    break;
                case FileInfoIndexField.FILE_NAME:
                    fileInfo.setFileName(column.getValue());
                    break;
                // 其他字段处理...
                default:
                    break;
            }
        }
        return fileInfo;
    }
}

四、测试验证, 亲测有效

image.png

image.png
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容