为什么需要协程池
虽然go语言在调度Goroutine已经优化的非常完善,开启一个Goroutine的代价非常小。但是,如果无休止的开辟Goroutine依然会出现高频率的调度Goroutine,那么依然会浪费很多上下文切换的资源。所以设计一个Goroutine池限制Goroutine数量是非常有必要的。
具体实现
先定义Job和Worker作为协程池控制的最基本单元。之前正好在学习网络编程,就用协程池来做了个实验。就借此来看看协程池的具体实现。
//协程池的最小工作单元,即具体业务处理结构体
type Job struct {
Connection net.Conn //客户端的连接
}
//队列,用来接收、发送请求
var JobQueue chan Job
//用于执行job,可以理解为job的管理者
type Worker struct {
WorkerPool chan chan Job
JobChannel chan Job
quit chan bool
}
//初始化Worker
func NewWorker(workerPool chan chan Job) Worker {
return Worker {
WorkerPool:workerPool,
JobChannel:make(chan Job),
quit:make(chan bool),
}
}
//运行Worker
func (w Worker) Start() {
go func() {
for {
//将可用的worker放进队列中
w.WorkerPool <- w.JobChannel
select {
case job := <- w.JobChannel:
//接收到具体请求时进行处理
HandleConnection(job.Connection)
case <-w.quit:
//接收停止请求
return
}
}
} ()
}
//发送停止请求
func (w Worker) Stop() {
go func() {
w.quit <- true
}()
}
接下来,定义分配worker的结构体dispatcher。
type Dispatcher struct {
WorkerPool chan chan Job //worker的池子,控制worker的数量
WorkerList []Worker //worker的切片
}
//根据传入的值,创建对应数量的channel
func NewDispatcher(maxWorkers int) *Dispatcher {
pool := make(chan chan Job, maxWorkers)
return &Dispatcher{
WorkerPool:pool,
}
}
//根据最大值,创建对应数量的worker
func (d *Dispatcher) Run() {
for i := 0; i < MaxWorkers; i++ {
worker := NewWorker(d.WorkerPool)
worker.Start()
d.WorkerList = append(d.WorkerList, worker)
}
//监听工作队列
go d.dispatch()
}
func (d *Dispatcher) dispatch() {
for {
select {
case job := <-JobQueue:
go func (job Job) {
jobChannel := <-d.WorkerPool
jobChannel <- job
}(job)
}
}
}
//停止所有的worker
func (d *Dispatcher) Stop() {
for _, worker := range d.WorkerList {
worker.Stop()
}
}
以下是主函数的代码。
func main() {
l, e := net.Listen("tcp",":3207")
if e != nil {
fmt.Println(e)
return
}
//创建dispatcher
dispatcher := routinePool.NewDispatcher(routinePool.MaxWorkers)
dispatcher.Run()
//初始化工作队列
routinePool.JobQueue = make(chan routinePool.Job, routinePool.MaxQueue)
defer l.Close()
defer dispatcher.Stop()
for {
//接受客户端的连接
conn, err := l.Accept()
if err != nil {
return
}
job := routinePool.Job{
Connection:conn,
}
//客户端连接放入工作队列
routinePool.JobQueue <- job
}
}
对于客户端请求的处理,我这里只做了最简单的打印处理。
//解包
func Unpack(buffer []byte, readerChannel chan []byte) []byte {
length := len(buffer)
var i int
for i = 0; i < length; i++ {
if length < i + DataLen {
break
}
//根据长度来获取数据
messageLen := BytesToInt(buffer[i:i+DataLen])
if length < i + DataLen + messageLen {
break
}
data := buffer[i+DataLen:i+DataLen+messageLen]
readerChannel <- data
i += DataLen + messageLen - 1
}
if i == length {
return make([]byte, 0)
}
return buffer[i:]
}
//字节转换成整形
func BytesToInt(b []byte) int {
bytesBuffer := bytes.NewBuffer(b)
var x int32
binary.Read(bytesBuffer, binary.BigEndian, &x)
return int(x)
}
//处理客户端请求
func HandleConnection(conn net.Conn) {
defer func() {
fmt.Println(conn.RemoteAddr())
conn.Close()
}()
tempBuffer := make([]byte, 0)
readerChannel := make(chan []byte, 16)
//fmt.Println(conn.RemoteAddr())
go reader(readerChannel)
buffer := make([]byte, 1024)
for {
n, err := conn.Read(buffer)
if err != nil {
return
}
tempBuffer = Unpack(append(tempBuffer, buffer[:n]...), readerChannel)
}
}
func reader(readerChannel chan []byte) {
for {
select {
case data := <- readerChannel:
//fmt.Println(string(data))
data = data
}
}
}
这是几个用到的常量。
const MaxWorkers = 100000
const MaxQueue = 3000
const DataLen = 4
文章和实现参考了用go一分钟处理百万请求