Go语言无缓冲通道创建协程池
这些协程池通常用于并发执行一组任务,最终组合起来完成某个功能。在这种情况下,使用无缓冲通道要比使用缓冲通道好,因为既不需要任务队列,也不需要一组协程配合执行,并且方便知道什么时候协程池正在执行任务,如果协程池中的所有协程都在忙,无法处理新的任务,也能及时通过通道通知调用者(分配给无缓冲通道的任务未处理会阻塞后续分配)。另外,使用无缓冲通道不会有任务在队列中丢失或卡住,所有任务都会被处理。
注:以上是来自https://xueyuanjun.com/post/22061
实现下列实例:
// worker/worker.go
pcakge worker
import (
"sync"
)
type Worker interface {
Task()
}
type Pool struct {
work chan Worker
wg sync.WaitGroup
}
func New(maxGoroutines int) *Pool {
p := Pool {
work:make(chan Worker),
}
p.wg.Add(maxGoroutines)
for i := 0; i < maxGoroutines;i ++ {
go func() {
for w := range p.work {
w.Task()
}
p.wg.Done()
}()
return &p
}
func (p *Pool) Run (w Worker) {
p.work <- w
}
func (p *Pool) ShutDown() {
close(p.work)
p.wg.Wait()
}
// main.go
package main
var langs = []string{
"Golang",
"PHP",
"JavaScript"
"Python",
"Java",
}
type langPrinter struct {
lang string
}
func (m *langPrinter) Task() {
log.Println(m.lang)
time.Sleep(time.Second)
}
func main() {
var setnum = 3
p := worker,New(3)
var wg syn c.WaitGroup
wg.Add(setnum * len(langs))
for i := 0; i < setnum; i++ {
lp := langPrinter{lang}
go func() {
p.Run(&lp)
wg.Donw()
}
}
wg.Wait()
p.ShutDown()
}
以上是无缓存创建协程池
现在我会对缓存池进行深入分析!
先对 worker.go 进行分析!
package worker
import (
"sync"
)
// 创建公共的接口,这个接口是执行任务逻辑的。
type Worker interface {
Task()
}
// 协程池 struct
type Pool struct {
work chan Worker 协程池
wg sync.WaitGroup 原子容器
}
// 实例化模块方法!开辟协程池,任务传递的相关功能!
func New(maxGoroutines int) * Pool {
// 开辟协程池,对其进行实例化
p := Pool{
work:make(chan Worker),
}
// 添加原子计数器
p.wg.Add(maxGoroutines)
// 一次处理任务个数,当 maxGoroutines = 3 。说明并发一次处理3个任务
for i := 0; i < maxGoroutines; i++ {
// 并发操作
go func() {
// 对 p.work (协程池) 进行遍历,把所有任务进行分发!
// 当 p.work 没有任务的时候,会进行阻塞
for w := range p.work {
w.Task()
}
// 对原子进行递减
p.wg.Done()
}()
}
return &p
}
// 任务写入方法!
func (p *Pool) Run (w Worker) {
// 把任务写入 p.work 的 chan 里面!
p.work <- w
}
// 销毁任务方法
func (p *Pool) ShutDown() {
// 把 p.work 协程池进行销毁
close(p.work)
// 等待所有 goroutine 执行完毕
p.wg.Wait()
}
package main
import (
"golang_no_buff/worker"
"log"
"sync"
"time"
)
var langs = []string{
"Golang",
"PHP",
"JavaScript",
"Python",
"Java",
}
type langPrinter struct {
lang string
}
// 对 langPrinter 绑定Task()方法,执行 langPrinter 的任务逻辑操作!
func (m *langPrinter) Task() {
log.Println(m.lang)
time.Sleep(time.Second)
}
func main() {
// 所执行任务数
var setnum = 3
p := worker.New(3)
// 创建原子计数队列
var wg sync.WaitGroup
// 添加原子数!3 * langs个数(5) = 15
wg.Add(setnum * len(langs))
// 对其循环3次
for i := 0; i < setnum; i++ {
// 循环 3 次 , 每一个 langs 有一个goroutine进行处理,当
for _, lang := range langs {
// 把 key 写到 langPrinter 里面去
lp := langPrinter{lang}
// 对其实现并发操作
go func() {
// p.Run() 是把数据写入 p.work 方法
p.Run(&lp)
// 对原子计数进行递减
wg.Done()
}()
}
}
// 等待所有线程执行
wg.Wait()
// 把任务进行销毁
p.ShutDown()
}
上面基本逻辑我以注释进行写入,里面原理需要自己体会!