MySQL 配置
[mysqld]
log-bin=mysql-bin
server-id=1
binlog-format=ROW
使用 go-mysql 读取 binlog
go get github.com/siddontang/go-mysql
代码实现 demo
package main
import (
"log"
"github.com/siddontang/go-mysql/canal"
"github.com/go-redis/redis/v8"
"context"
)
// Redis 配置
var ctx = context.Background()
var redisClient = redis.NewClient(&redis.Options{
Addr: "localhost:6379", // Redis 地址
Password: "", // Redis 密码
DB: 0, // 默认DB
})
func main() {
cfg := canal.NewDefaultConfig()
cfg.Addr = "127.0.0.1:3306" // MySQL 地址
cfg.User = "root"
cfg.Password = "yourpassword"
cfg.Flavor = "mysql" // 数据库类型,默认为 MySQL
c, err := canal.NewCanal(cfg)
if err != nil {
log.Fatal(err)
}
// 注册回调函数,监听 binlog 事件
c.SetEventHandler(&handler{})
// 开始同步 binlog
c.Run()
}
type handler struct{}
// OnRow 是处理数据变动的回调函数
func (h *handler) OnRow(e *canal.RowsEvent) error {
// 解析 binlog 事件并处理
for _, row := range e.Rows {
log.Printf("table: %s, action: %s, data: %v", e.Table.Name, e.Action, row)
// 在这里进行同步到 Redis
// 例如:
if e.Action == "insert" || e.Action == "update" {
redisKey := e.Table.Name + ":id:" + string(row[0].([]byte)) // 主键作为 Redis key
redisClient.Set(ctx, redisKey, row, 0).Err()
} else if e.Action == "delete" {
redisKey := e.Table.Name + ":id:" + string(row[0].([]byte))
redisClient.Del(ctx, redisKey).Err()
}
}
return nil
}
func (h *handler) String() string {
return "MyEventHandler"
}
实现逻辑
- 连接 MySQL 并读取 binlog:通过 go-mysql 库监听 MySQL 的 binlog 事件(插入、更新、删除等)。
- 处理 binlog 事件:通过 OnRow 回调函数处理每一个变动的行数据。
- 更新 Redis:根据不同的事件类型(插入、更新、删除),更新或删除 Redis 中对应的键值对。