golang使用信号量模式并发安全的迁移千万条数据

由于公司业务需要: 需将 PostgreSQL 数据库中的9百万条数据 迁移到 MySQL.

现将迁移脚本的开发过程记录如下:

安装驱动库

go get -u gorm.io/gorm
go get -u gorm.io/driver/postgres
go get -u gorm.io/driver/mysql

初始化数据库连接

import (
  "gorm.io/driver/postgres"
  "gorm.io/gorm"
)

var MySQLClientBI *gorm.DB
var PostgreSQLClient *gorm.DB

func init() {
    dsnPg := "host=localhost user=gorm password=gorm dbname=gorm port=9920 sslmode=disable TimeZone=Asia/Shanghai"
    pgDB, err := gorm.Open(postgres.Open(dsnPg), &gorm.Config{})
    if err != nil {
        fmt.Println(err)
    }
    PostgreSQLClient = pgDB
    
    dsn := "user:pass@tcp(127.0.0.1:3306)/dbname?charset=utf8mb4&parseTime=True&loc=Local"
    db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{})  
    if err != nil {
        fmt.Println(err)
    }
    MySQLClientBI = db
}

批量插入

GORM 的 CreateInBatches 方法可以用于批量插入数据,这确实有助于提高大量数据的插入效率。但是,有几点需要注意:

  1. 内存使用:即使使用 CreateInBatches,如果你首先从一个数据库中提取900万条记录并尝试将其存储在内存中,那么可能会出现内存使用过高的问题。你应该在查询数据时考虑分页或限制提取的记录数。
  2. 批次大小:为了达到最佳性能和避免潜在的问题,您需要确定合适的批次大小。例如,一次插入1000或5000条记录,而不是所有900万条记录。
  3. MySQL的限制:MySQL有一个max_allowed_packet参数,它定义了单个客户端发送到MySQL服务器的数据包的最大大小。批量插入时可能会触及此限制,导致错误。

基于以上考虑,建议以下方法:

  1. 分批从 PostgreSQL 中读取数据,例如每次读取5000条。
  2. 使用 CreateInBatches 将每批数据插入到 MySQL 中。

这样可以确保不会因为一次性处理大量数据而耗尽内存,并且可以在必要时轻松调整批次大小。

简化代码如下:

const batchSize = 5000
var offset = 0
for {
    // 从pg查,一次查 5000
    var cfgEventParamsValue []EventParamsValue
    result := pgDB.Table("data_cfg.cfg_event_params_value").Limit(batchSize).Offset(offset).Find(&cfgEventParamsValue)
    if result.Error != nil {
        log.Fatalf("Error fetching from PostgreSQL: %v", result.Error)
    }
    if len(cfgEventParamsValue) == 0 {
        break
    }
    // 入mysql,一次入 5000
    err := mysqlDB.Table("cfg_event_params_value").CreateInBatches(cfgEventParamsValue, batchSize).Error
    if err != nil {
        log.Fatalf("Error inserting batch into MySQL: %v", err)
    }
    offset += batchSize
}

开启协程

使用 Go 的协程可以极大地提高数据迁移的效率,特别是当涉及到网络IO或数据库IO操作时。但请注意,太多的并发可能会对数据库造成压力,导致性能下降或其他问题,所以需要平衡。

代码如下:

package main

import (
    "fmt"
    "log"
    "sync"

    "gorm.io/driver/mysql"
    "gorm.io/driver/postgres"
    "gorm.io/gorm"
)

type EventParamsValue struct {
    ID    uint   `gorm:"primaryKey"`
    // 根据实际字段和类型调整以下内容
    FieldName string
}

var pgDB *gorm.DB
var mysqlDB *gorm.DB
var wg sync.WaitGroup   // 使用 sync.WaitGroup 来确保主程序等待所有的协程完成

const batchSize = 5000

func migrateBatch(offset int) {
    defer wg.Done()

     // 从pg查,一次查 5000
    var cfgEventParamsValue []EventParamsValue
    result := pgDB.Table("data_cfg.cfg_event_params_value").Limit(batchSize).Offset(offset).Find(&cfgEventParamsValue)
    if result.Error != nil {
        log.Printf("Error fetching from PostgreSQL: %v", result.Error)
        return
    }

    if len(cfgEventParamsValue) == 0 {
        return
    }
    
    // 入mysql,一次入 5000
    err := mysqlDB.Table("cfg_event_params_value").CreateInBatches(cfgEventParamsValue, batchSize).Error
    if err != nil {
        log.Printf("Error inserting batch into MySQL: %v", err)
    }
}

func main() {
    // PostgreSQL 连接 (省略了代码...)

    // MySQL 连接 (省略了代码...)

    // 获取总记录数
    var totalRecords int64
    pgDB.Table("data_cfg.cfg_event_params_value").Count(&totalRecords)

    for offset := 0; offset < int(totalRecords); offset += batchSize {
        wg.Add(1)
        go migrateBatch(offset)  // 使用协程执行数据迁移
    }

    wg.Wait() // 等待所有协程完成
    fmt.Println("Migration complete!")
}

协程非常快,所以可能会很快地打开很多协程。如果发现数据库响应缓慢或有其他问题,可能需要引入一个限制并发数量的机制,例如使用通道 (channel) 或第三方库,如 semaphore

在执行此程序之前,务必先在非生产环境中测试,确保其行为如预期,并确保它不会对您的数据库产生不良影响。

上述这段代码根据数据总量和每批处理的数据量(batchSize)来决定开启多少个协程。

这里是决定开启协程数量的关键部分:

for offset := 0; offset < int(totalRecords); offset += batchSize {
    wg.Add(1)
    go migrateBatch(offset)  // 使用协程执行数据迁移
}

每次迭代中,我们都会开启一个新的协程。迭代的次数由总记录数(totalRecords)和每批的大小(batchSize)决定。

计算开启的协程数量的公式为:

numGoroutines = ceil(totalRecords / batchSize)

其中 ceil 是向上取整函数。例如,如果您有 9,000,000 条记录,并且每批大小是 5,000,那么您会开启 1,800 个协程。

需要注意的是,尽管协程在 Go 中非常轻量,但同时开启太多协程可能会导致数据库连接的问题,尤其是当数据库的连接池大小有限时。您可能需要考虑增加数据库的连接数限制或使用信号量来限制同时运行的协程数量,以保护数据库不被过度压迫。

增加信号量模式

为了手动控制协程数量,可以使用Go中的信号量模式,利用chan struct{}来达到这个目的。

package main

import (
    "fmt"
    "log"
    "sync"

    "gorm.io/driver/mysql"
    "gorm.io/driver/postgres"
    "gorm.io/gorm"
)

type EventParamsValue struct {
    ID    uint   `gorm:"primaryKey"`
    // 根据实际字段和类型调整以下内容
    FieldName string
}

var pgDB *gorm.DB
var mysqlDB *gorm.DB
var wg sync.WaitGroup

const batchSize = 5000
const maxGoroutines = 10 // 手动设置最大并发协程数量

var sem = make(chan struct{}, maxGoroutines)

func migrateBatch(offset int) {
    defer wg.Done()
    defer func() { <-sem }() // 释放一个协程位

    var cfgEventParamsValue []EventParamsValue
    result := pgDB.Table("data_cfg.cfg_event_params_value").Limit(batchSize).Offset(offset).Find(&cfgEventParamsValue)
    if result.Error != nil {
        log.Printf("Error fetching from PostgreSQL: %v", result.Error)
        return
    }

    // 如果该批次没有数据,直接返回
    if len(cfgEventParamsValue) == 0 {
        return
    }

    log.Printf("Migrating records from offset %d to %d", offset, offset+batchSize-1) // 记录每批次迁移数据的起止

    err := mysqlDB.Table("cfg_event_params_value").CreateInBatches(cfgEventParamsValue, batchSize).Error
    if err != nil {
        log.Printf("Error inserting batch into MySQL from offset %d to %d: %v", offset, offset+batchSize-1, err)
    } else {
        log.Printf("Successfully migrated records from offset %d to %d", offset, offset+batchSize-1)
    }
}

func main() {
    // PostgreSQL 连接 (省略了代码...)

    // MySQL 连接 (省略了代码...)

    // 获取总记录数
    var totalRecords int64
    pgDB.Table("data_cfg.cfg_event_params_value").Count(&totalRecords)

    for offset := 0; offset < int(totalRecords); offset += batchSize {
        sem <- struct{}{} // 获取一个协程位
        wg.Add(1)
        go migrateBatch(offset)  // 使用协程执行数据迁移
    }

    wg.Wait() // 等待所有协程完成
    fmt.Println("Migration complete!")
}

在上面的代码中:

  • sem 是一个有限容量的通道,用于控制并发的协程数量。
  • sem <- struct{}{} 尝试向通道发送一个空结构,如果通道已满,这一行将会阻塞,直到有其他协程完成并释放一个位置。
  • defer func() { <-sem }() 保证当协程结束时,从sem通道中移除一个空结构,从而释放一个协程位置。

这样,一次只有maxGoroutines数量的协程能够并发运行。您可以根据需要调整maxGoroutines的值来控制并发数量。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,816评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,729评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,300评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,780评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,890评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,084评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,151评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,912评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,355评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,666评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,809评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,504评论 4 334
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,150评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,882评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,121评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,628评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,724评论 2 351

推荐阅读更多精彩内容