canal php_canal使用 数据同步解耦

java的生态就是好,实际开发的时候经常会用到数据同步,传统的做法就是写很多触发器,监听到变动的时候,处理同步逻辑,或者定时任务处理每天跑全量同步逻辑。canal这个确实模仿主从复制,直接数据库级别,效率和解耦程度就非常好用的。

Windows环境本地搭建canal php框架采用thinkphp8

前置 phpstudy 安装php8.2.9,composer安装好tp项目。

开源项目源码

https://github.com/xingwenge/canal-php
https://github.com/alibaba/canal

1.下载java客户端canal

image.png

这里我下载是这个最新稳定版本的这个。
解压后可以看到项目。


image.png

Windows双击启动 (需要先配置主从复制的从库)


image.png

2.配置主从复制的从库

在主库创建canal用户,并分配权限。

-- 创建无IP限制的用户(% 表示任意主机)
CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';

-- 授予复制权限
GRANT REPLICATION SLAVE ON *.* TO 'canal'@'%';


-- REPLICATION CLIENT :允许 canal 查询主从同步状态(如 SHOW MASTER STATUS )。
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
--要给 canal 用户赋予读取 所有库、所有表 的权限,需在 MySQL 中执行授权语句,确保 canal 能正常采集 binlog ,以下是具体操作:
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';

-- 刷新权限使更改生效
FLUSH PRIVILEGES;

双击启动canal


image.png

查看日志是否执行成功。


image.png

3.thinkphp8里运行监听

安装扩展
composer show xingwenge/canal_php -a 可以查看最新版,截止当前最新版本为v1.0.3

composer require xingwenge/canal_php:^1.0.3

安装成功后创建命令,然后配置命令

<?php
declare (strict_types = 1);

namespace app\command;
use Com\Alibaba\Otter\Canal\Protocol\Entry;
use Com\Alibaba\Otter\Canal\Protocol\EntryType;
use Com\Alibaba\Otter\Canal\Protocol\EventType;
use Com\Alibaba\Otter\Canal\Protocol\RowChange;
use think\console\Command;
use think\console\Input;
use think\console\Output;
use xingwenge\canal_php\CanalClient;
use xingwenge\canal_php\CanalConnectorFactory;

/***
 * php think canal
 */
class Canal extends Command
{
    protected function configure()
    {
        // 指令配置
        $this->setName('canal')
            ->setDescription('the canal command');
    }

    protected function execute(Input $input, Output $output)
    {
        try {
            $client = CanalConnectorFactory::createClient(CanalClient::TYPE_SOCKET_CLUE);
            # $client = CanalConnectorFactory::createClient(CanalClient::TYPE_SWOOLE);

            $client->connect("127.0.0.1", 11111);
            $client->checkValid();
            $client->subscribe("1001", "example", ".*\\..*");
            # $client->subscribe("1001", "example", "db_name.tb_name"); # 设置过滤

            while (true) {
                $message = $client->get(100);
                if ($entries = $message->getEntries()) {
                    /**
                     * @var Entry $entry
                     */
                    foreach ($entries as $entry) {
                        $entryType = $entry->getEntryType();
                        if ($entryType !== EntryType::ROWDATA) {
                            continue; // 跳过事务头之类的
                        }

                        $rowChange = new RowChange();
                        $rowChange->mergeFromString($entry->getStoreValue());
                        $eventType = $rowChange->getEventType();

                        $dbName = $entry->getHeader()->getSchemaName();      // 数据库名
                        $tableName = $entry->getHeader()->getTableName();    // 表名
                        $eventName = $this->getEventName($eventType);               // insert / update / delete

                        /**
                         * @var \Com\Alibaba\Otter\Canal\Protocol\RowData $rowData
                         */
                        foreach ($rowChange->getRowDatas() as $rowData) {
                            $beforeData = $this->parseColumns($rowData->getBeforeColumns()); // 更新前
                            $afterData = $this->parseColumns($rowData->getAfterColumns());   // 更新后

                            echo "=====================\n";
                            echo "数据库: {$dbName}\n";
                            echo "数据表: {$tableName}\n";
                            echo "操作类型: {$eventName}\n";

                            echo "更新前数据:\n";
                            print_r($beforeData);

                            echo "更新后数据:\n";
                            print_r($afterData);

                            // 示例:你可以根据表名和操作类型做处理
                            if ($tableName === 'user') {
                                if ($eventName === 'insert') {
                                    // 新增逻辑
                                } elseif ($eventName === 'update') {
                                    // 修改逻辑
                                } elseif ($eventName === 'delete') {
                                    // 删除逻辑
                                }
                            }
                        }
                    }

                }
                sleep(1);
            }

            $client->disConnect();
        } catch (\Exception $e) {
            echo $e->getMessage(), PHP_EOL;
        }
    }


    // 将 ColumnList 转为数组:["字段名" => "字段值"]
    function parseColumns($columns): array {
        $result = [];
        foreach ($columns as $column) {
            $result[$column->getName()] = $column->getValue();
        }
        return $result;
    }

// 转换事件类型为字符串
    function getEventName($eventType): string {
        switch ($eventType) {
            case EventType::INSERT:
                return 'insert';
            case EventType::UPDATE:
                return 'update';
            case EventType::DELETE:
                return 'delete';
            default:
                return 'unknown';
        }
    }
}

被AI坑了,说是要ack确认消息。实际使用getId()获取消息的时候,源码已经自动确认了。


image.png

4.测试。

使用navicate连接数据库,修改数据表,增删改查。查看监听结果。成功监听。


image.png

5.其他

canal自带支持写入消息队列,处理消息,当业务量大的时候可以直接使用消息队列处理,比这种处理方式高效多,这种如果处理速度慢,会出现积压。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容