java的生态就是好,实际开发的时候经常会用到数据同步,传统的做法就是写很多触发器,监听到变动的时候,处理同步逻辑,或者定时任务处理每天跑全量同步逻辑。canal这个确实模仿主从复制,直接数据库级别,效率和解耦程度就非常好用的。
Windows环境本地搭建canal php框架采用thinkphp8
前置 phpstudy 安装php8.2.9,composer安装好tp项目。
开源项目源码
https://github.com/xingwenge/canal-php
https://github.com/alibaba/canal
1.下载java客户端canal

image.png
这里我下载是这个最新稳定版本的这个。
解压后可以看到项目。

image.png
Windows双击启动 (需要先配置主从复制的从库)

image.png
2.配置主从复制的从库
在主库创建canal用户,并分配权限。
-- 创建无IP限制的用户(% 表示任意主机)
CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
-- 授予复制权限
GRANT REPLICATION SLAVE ON *.* TO 'canal'@'%';
-- REPLICATION CLIENT :允许 canal 查询主从同步状态(如 SHOW MASTER STATUS )。
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
--要给 canal 用户赋予读取 所有库、所有表 的权限,需在 MySQL 中执行授权语句,确保 canal 能正常采集 binlog ,以下是具体操作:
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- 刷新权限使更改生效
FLUSH PRIVILEGES;
双击启动canal

image.png
查看日志是否执行成功。

image.png
3.thinkphp8里运行监听
安装扩展
composer show xingwenge/canal_php -a 可以查看最新版,截止当前最新版本为v1.0.3
composer require xingwenge/canal_php:^1.0.3
安装成功后创建命令,然后配置命令
<?php
declare (strict_types = 1);
namespace app\command;
use Com\Alibaba\Otter\Canal\Protocol\Entry;
use Com\Alibaba\Otter\Canal\Protocol\EntryType;
use Com\Alibaba\Otter\Canal\Protocol\EventType;
use Com\Alibaba\Otter\Canal\Protocol\RowChange;
use think\console\Command;
use think\console\Input;
use think\console\Output;
use xingwenge\canal_php\CanalClient;
use xingwenge\canal_php\CanalConnectorFactory;
/***
* php think canal
*/
class Canal extends Command
{
protected function configure()
{
// 指令配置
$this->setName('canal')
->setDescription('the canal command');
}
protected function execute(Input $input, Output $output)
{
try {
$client = CanalConnectorFactory::createClient(CanalClient::TYPE_SOCKET_CLUE);
# $client = CanalConnectorFactory::createClient(CanalClient::TYPE_SWOOLE);
$client->connect("127.0.0.1", 11111);
$client->checkValid();
$client->subscribe("1001", "example", ".*\\..*");
# $client->subscribe("1001", "example", "db_name.tb_name"); # 设置过滤
while (true) {
$message = $client->get(100);
if ($entries = $message->getEntries()) {
/**
* @var Entry $entry
*/
foreach ($entries as $entry) {
$entryType = $entry->getEntryType();
if ($entryType !== EntryType::ROWDATA) {
continue; // 跳过事务头之类的
}
$rowChange = new RowChange();
$rowChange->mergeFromString($entry->getStoreValue());
$eventType = $rowChange->getEventType();
$dbName = $entry->getHeader()->getSchemaName(); // 数据库名
$tableName = $entry->getHeader()->getTableName(); // 表名
$eventName = $this->getEventName($eventType); // insert / update / delete
/**
* @var \Com\Alibaba\Otter\Canal\Protocol\RowData $rowData
*/
foreach ($rowChange->getRowDatas() as $rowData) {
$beforeData = $this->parseColumns($rowData->getBeforeColumns()); // 更新前
$afterData = $this->parseColumns($rowData->getAfterColumns()); // 更新后
echo "=====================\n";
echo "数据库: {$dbName}\n";
echo "数据表: {$tableName}\n";
echo "操作类型: {$eventName}\n";
echo "更新前数据:\n";
print_r($beforeData);
echo "更新后数据:\n";
print_r($afterData);
// 示例:你可以根据表名和操作类型做处理
if ($tableName === 'user') {
if ($eventName === 'insert') {
// 新增逻辑
} elseif ($eventName === 'update') {
// 修改逻辑
} elseif ($eventName === 'delete') {
// 删除逻辑
}
}
}
}
}
sleep(1);
}
$client->disConnect();
} catch (\Exception $e) {
echo $e->getMessage(), PHP_EOL;
}
}
// 将 ColumnList 转为数组:["字段名" => "字段值"]
function parseColumns($columns): array {
$result = [];
foreach ($columns as $column) {
$result[$column->getName()] = $column->getValue();
}
return $result;
}
// 转换事件类型为字符串
function getEventName($eventType): string {
switch ($eventType) {
case EventType::INSERT:
return 'insert';
case EventType::UPDATE:
return 'update';
case EventType::DELETE:
return 'delete';
default:
return 'unknown';
}
}
}
被AI坑了,说是要ack确认消息。实际使用getId()获取消息的时候,源码已经自动确认了。

image.png
4.测试。
使用navicate连接数据库,修改数据表,增删改查。查看监听结果。成功监听。

image.png
5.其他
canal自带支持写入消息队列,处理消息,当业务量大的时候可以直接使用消息队列处理,比这种处理方式高效多,这种如果处理速度慢,会出现积压。