背景
在项目开发过程中,往往会涉及到同时插入或修改多条数据,并且操作是需要保证事务原子性的,要么全部成功,要么全部失败,此时最好的办法是一次请求完成全部的数据操作,即将所有的数据拼接成一条SQL语句,但如果我们需要对数据修改前后进行保存作为记录日志的话,将数据拼接成一条SQL语句就行不通了,此时可以通过开启多协程去并发执行修改操作
datas := buildData()
//db.SetConnMaxLifetime(time.Second * 30)
tx, err := db.Begin()
if err != nil {
fmt.Println("tx err:%+v", err)
return
}
var ids []string
var wg sync.WaitGroup
var txErr error
for i := range datas {
ids = append(ids, datas[i].ID)
wg.Add(1)
go func(i int) {
defer wg.Done()
//r, getErr := tx.Query("select * from t_crud")
//if getErr != nil {
// fmt.Println("geterr:", getErr)
// return
//}
//fmt.Println(r.Columns())
//r.Close()
sql := fmt.Sprintf("insert into t_crud( `id`,`name`,`addr` ) values('%s','%s','%s')", datas[i].ID, datas[i].Name, datas[i].Addr)
fmt.Println(sql)
res, err2 := tx.Exec(sql)
if err2 != nil {
fmt.Println("exec err:%+v", err2)
txErr = err2
return
}
fmt.Println(res.RowsAffected())
//rows, getErr2 := tx.QueryContext(context.Background(), "select * from t_crud where id=?", "id_99")
//if getErr2 != nil {
// fmt.Println("geterr2:", getErr2)
// return
//}
//
//fmt.Println(rows.Columns())
//rows.Close()
}(i)
}
wg.Wait()
if txErr != nil {
fmt.Println("has err roolback:", txErr)
tx.Rollback()
return
}
tx.Commit()
对数据遍历,每条数据开启一个协程,最后再汇总错误,当错误不为nil时回滚,否则提交,这样的缺点是有多少条数据就开启多少个协程,协程不好管理,可以通过channel对每次请求的协程数进行控制。
func buildData3() {
tx, err := db.Begin()
if err != nil {
fmt.Println("tx err:%+v", err)
return
}
var txErr error
datas := buildData()
execFun := func(index int) {
sql := fmt.Sprintf("insert into t_crud( `id`,`name`,`addr` ) values('%s','%s','%s')", datas[index].ID, datas[index].Name, datas[index].Addr)
fmt.Println(sql)
res, err2 := tx.Exec(sql)
if err2 != nil {
fmt.Println("exec err:%+v", err2)
txErr = err2
return
}
fmt.Println(res.RowsAffected())
}
// 同时最多有10个协程在跑
DoBatch2(len(datas), 10, execFun)
if txErr != nil {
tx.Rollback()
panic(txErr)
return
}
tx.Commit()
}
// DoBatch 开启指定协程数批量执行
func DoBatch(max int, goroutineNum int, execFun func(index int)) {
var (
wg sync.WaitGroup
workLimiter = make(chan struct{}, goroutineNum)
)
for i := 0; i < max; i++ {
wg.Add(1)
select {
case workLimiter <- struct{}{}:
go func(i int) {
defer func() {
<-workLimiter
wg.Done()
}()
execFun(i)
}(i)
}
}
wg.Wait()
}
通过channel对当前的协程数量进行控制,同时只允许goroutineNum
个协程在跑,否则其他任务应该阻塞
driver:bad connection
值得一提的是,同一事务开启多协程的同时如果有并发读,那可能会出现driver:bad connection错误,原因是同一事务同一时间只能有一个可以进行读操作,读完之后需要将查询得到的Rows关闭
r, getErr := tx.Query("select * from t_crud")
if getErr != nil {
fmt.Println("geterr:", getErr)
return
}
fmt.Println(r.Columns())
r.Close()