从一个简单的流水线入手
在Golang中,流水线由多个阶段组成,每个阶段之间通过channel连接,每个节点可以由多个同时运行的goroutine组成。
从最简单的流水线入手。下图的流水线由3个阶段组成,分别是A、B、C,A和B之间是通道aCh
,B和C之间是通道bCh
,A生成数据传递给B,B生成数据传递给C。
流水线中,第一个阶段的协程是生产者,它们只生产数据。最后一个阶段的协程是消费者,它们只消费数据。下图中A是生成者,C是消费者,而B只是中间过程的处理者。
下面这段代码:
·producer()负责生产数据,它会把数据写入通道,并把它写数据的通道返回。
·square()负责从某个通道读数字,然后计算平方,将结果写入通道,并把它的输出通道返回。
·main()负责启动producer和square,并且还是消费者,读取suqre的结果,并打印出来。
package main
import (
"fmt"
)
func producer(nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
out <- n
}
}()
return out
}
func square(inCh <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range inCh {
out <- n * n
}
}()
return out
}
func main() {
in := producer(1, 2, 3, 4)
ch := square(in)
// consumer
for ret := range ch {
fmt.Printf("%3d", ret)
}
fmt.Println()
}
流水线的特点
·每个阶段把数据通过channel传递给下一个阶段。
·每个阶段要创建1个goroutine和1个通道,这个goroutine向里面写数据,函数要返回这个通道。
·有1个函数来组织流水线,我们例子中是main函数。
实际优化
来结合一下实际业务:用户上传商品列表时需要匹配一些属性,商品的名字和收获仓库之类,但是需要用名字匹配出Id,最后整理出一张数据关系表。
type Arr struct{
CusName string
ItemName string
StoreName string
Cost int64
}
type IntoDBMap struct{
CusId int64
ItemId int64
StoreId int64
Cost int64
}
func IntoDB(){
InArr := make([]*Arr,0)
for k,_:=InArr {
//找出库里的初始数据
reCusArr:=getCus(CusId)
reItemArr:=getItem(ItemId)
reStoreArr:=getStore(StoreId)
//通过map进行去重和新ID入库
var CusMap[string]int64
for k,_:=range reCusArr{
CusMap[InArr[k].Name] = InArr[k].Id
}
...
if _,ok:=CusMap[InArr[k].CusName];!ok{
xorm.Insert(...)
getCus:=xorm.Get(...)
CusMap[InArr[k].CusName] = getCus.Id
}
...
//生成最后表格
inMap := &IntoDBMap{
CusId :
ItemId :
StoreId :
Cost :
}
xorm.Insert(inMap)
}
}
这样的串行逻辑,如果数据量一大,再加上大量的insert和get操作,就会导致请求超时。解决方法有很多,前端可以做分批上传,后台分批去接,最后整个数据表统一做入库处理,当然后端也需要先给前端一个返回值。
但是这一部分代码不能只丢给一个子线程去处理,这并没有提升代码的效率,甚至有点像在骗自己。
优化思路
project方法:负责生产数据,在这段业务里可以实现那几个map数据的生产,并把它写数据的通道返回。
square方法:负责循环判断上传数据是否存在,以及入库和更新map的操作,并把它的输出通道返回。
intoDb方法:负责启动producer和square,并把最后数据匹配和入库。