本片文章主要讲解一个简单的连接池,给定一个连接池完成所有分配的任务。
结构体部分
工作内容结构体
type Job struct {
id int
random int
}
结果结构体
type Result struct {
job Job
sum int
}
执行工作以及传递结果的缓存通道
var jobs = make(chan Job, 10)
var results = make(chan Result, 10)
分配任务,管道满则等待协程读取
func allocate(n int) {
for i := 1; i <= n; i++ {
jobs <- Job{id: i, random: rand.Intn(999)}
}
close(jobs)
}
执行工作部分代码通过wg等待协程执行完毕,由于使用range 遍历通道,所有协程需要等待通道中内容读取完毕才能执行完。
func worker(wg *sync.WaitGroup) {
for job := range jobs {
results <- Result{job: job, sum: digit(job.random)}
}
wg.Done()
}
func digit(n int) int {
sum := 0
for n != 0 {
sum += n % 10
n /= 10
}
return sum
}
读取结果并打印
func result(done chan bool) {
for result := range results {
fmt.Printf("job id is %d,job random is %d,sum is %d\n",
result.job.id, result.job.random, result.sum)
}
done <- true
}
创建协程池
func createWorkerPool(n int) {
var wg sync.WaitGroup
for i := 0; i < n; i++ {
wg.Add(1)
go worker(&wg)
}
wg.Wait()
close(results)
}
触发工作运行
func activateWorkerPool(count, parallel int) {
startTime := time.Now()
done := make(chan bool)
go allocate(count)
go result(done)
createWorkerPool(parallel)
<-done
endTime := time.Now()
fmt.Printf("exec this job has taken %s seconds\n", endTime.Sub(startTime))
}
func main() {
activateWorkerPool(10000, 10)
}
这样就使用了 10个协程完成 10000份工作