问题
开发过程中,有批量处理的逻辑,依赖下游一同的接口(这个接口耗时感人,400 ~ 600ms),如果串行执行批量操作的话。。emmm。。所以想法就是尽量减少耗时,并行执行~
思路&解法
golang在这个方面,天然有优势。写了个通用的小工具(代码如下),并行执行任务,其中,ProcFunc是任务的具体执行逻辑,ProCtx是任务的上下文结构体,一个生产者向生产队列生产上下文,paralNum个goroutine作为worker并行执行任务,worker执行完后向consum队列返回上下文(包含输出和err信息),主线程获取执行完的结果。如果超时任务将会通过goCtx被取消。
package common
import (
"context"
"time"
)
// 处理函数定义,根据自己需求变动
type ProcFunc func(p interface{}) (interface{}, error)
type ProCtx struct {
In interface{}
DoFunc ProcFunc
Out interface{}
err error
}
func Produce(proChan chan *ProCtx, items []*ProCtx) {
for _, item := range items {
proChan <- item
}
}
func Worker(goCtx context.Context, proChan chan *ProCtx, consumChan chan *ProCtx) {
for {
select {
case item := <-proChan:
item.Out, item.err = item.DoFunc(item.In)
consumChan <- item
case <-goCtx.Done():
// 任务被父线程取消了
//println("任务取消!!!!!!")
return
}
}
}
/*
ps:执行任务的上下文
timeOut:超时时长,单位毫秒
paralNum:并行度
*/
func ParalProc(ps []*ProCtx, timeOut time.Duration, paralNum int) []*ProCtx {
n := len(ps)
ret := []*ProCtx{}
pChan := make(chan *ProCtx, n)
cChan := make(chan *ProCtx, n)
timeOutChan := time.After(timeOut)
contex, cancel := context.WithCancel(context.Background())
go Produce(pChan, ps)
for i := 0; i < paralNum; i++ {
go Worker(contex, pChan, cChan)
}
L:
for i := 0; i < n; i++ {
select {
case item := <-cChan:
ret = append(ret, item)
case <-timeOutChan:
// 超时取消子goroutine的任务
//println("################超时啦#################")
cancel()
break L
}
}
cancel() // 注:这里之前漏掉了,导致了内存泄漏。。。
return ret
}
使用示例
package main
import (
"errors"
"fanoutfanin/paral"
"math/rand"
"time"
)
type A struct {
Num int
}
func TestProc(p interface{}) (interface{}, error) {
a, ok := p.(A)
if !ok {
return nil, errors.New("type wrong!!!!!!!!!")
}
//time.Sleep(time.Second * time.Duration(10))
time.Sleep(time.Second * time.Duration(rand.Intn(10)))
println(" -> ", a.Num)
return a.Num * a.Num, nil
}
func main() {
n := 100
ps := []*paral.ProCtx{}
for i := 0; i < n; i++ {
ps = append(ps, ¶l.ProCtx{
In: A{i},
DoFunc: TestProc,
})
}
ret := paral.ParalProc(ps, time.Second*time.Duration(1), len(ps))
for _, a := range ret {
println(a.Out.(int))
}
println("**********************", len(ret))
time.Sleep(time.Second * time.Duration(rand.Intn(5)))
}
改进点
- 某些场景能搞个线程池是最好的(虽然go一下就好,但是可以装13,哈哈哈)
- 无缓冲的chan也是可以的