初探go的协程池

为什么需要协程池

虽然go语言在调度Goroutine已经优化的非常完善,开启一个Goroutine的代价非常小。但是,如果无休止的开辟Goroutine依然会出现高频率的调度Goroutine,那么依然会浪费很多上下文切换的资源。所以设计一个Goroutine池限制Goroutine数量是非常有必要的。

具体实现

先定义Job和Worker作为协程池控制的最基本单元。之前正好在学习网络编程,就用协程池来做了个实验。就借此来看看协程池的具体实现。

//协程池的最小工作单元,即具体业务处理结构体
type Job struct {
    Connection net.Conn  //客户端的连接
}

//队列,用来接收、发送请求
var JobQueue chan Job

//用于执行job,可以理解为job的管理者
type Worker struct {
    WorkerPool chan chan Job
    JobChannel chan Job
    quit chan bool
}

//初始化Worker
func NewWorker(workerPool chan chan Job) Worker {
    return Worker {
        WorkerPool:workerPool,
        JobChannel:make(chan Job),
        quit:make(chan bool),
    }
}

//运行Worker
func (w Worker) Start() {
    go func() {
        for {
            //将可用的worker放进队列中
            w.WorkerPool  <- w.JobChannel
            select {
            case job := <- w.JobChannel:
                //接收到具体请求时进行处理
                HandleConnection(job.Connection)
            case <-w.quit:
                //接收停止请求
                return
            }
        }
    } ()
}

//发送停止请求
func (w Worker) Stop() {
    go func() {
        w.quit <- true
    }()
}

接下来,定义分配worker的结构体dispatcher。

type Dispatcher struct {
    WorkerPool chan chan Job    //worker的池子,控制worker的数量
    WorkerList []Worker         //worker的切片
}

//根据传入的值,创建对应数量的channel
func NewDispatcher(maxWorkers int) *Dispatcher {
    pool := make(chan chan Job, maxWorkers)
    return &Dispatcher{
        WorkerPool:pool,
    }
}

//根据最大值,创建对应数量的worker
func (d *Dispatcher) Run() {
    for i := 0; i < MaxWorkers; i++ {
        worker := NewWorker(d.WorkerPool)
        worker.Start()
        d.WorkerList = append(d.WorkerList, worker)
    }
    //监听工作队列
    go d.dispatch()
}

func (d *Dispatcher) dispatch() {
    for {
        select {
        case job := <-JobQueue:
            go func (job Job) {
                jobChannel := <-d.WorkerPool
                jobChannel <- job
            }(job)
        }
    }
}

//停止所有的worker
func (d *Dispatcher) Stop() {
    for _, worker := range d.WorkerList {
        worker.Stop()
    }
}

以下是主函数的代码。

func main() {
    l, e := net.Listen("tcp",":3207")
    if e != nil {
        fmt.Println(e)
        return
    }

    //创建dispatcher
    dispatcher := routinePool.NewDispatcher(routinePool.MaxWorkers)
    dispatcher.Run()
    //初始化工作队列
    routinePool.JobQueue = make(chan routinePool.Job, routinePool.MaxQueue)

    defer l.Close()
    defer dispatcher.Stop()

    for {
        //接受客户端的连接
        conn, err := l.Accept()
        if err != nil {
            return
        }

        job := routinePool.Job{
            Connection:conn,
        }
        //客户端连接放入工作队列
        routinePool.JobQueue <- job
    }
}

对于客户端请求的处理,我这里只做了最简单的打印处理。

//解包
func Unpack(buffer []byte, readerChannel chan []byte) []byte {
    length := len(buffer)

    var i int
    for i = 0; i < length; i++ {
        if length < i + DataLen {
            break
        }
        //根据长度来获取数据
        messageLen := BytesToInt(buffer[i:i+DataLen])
        if length < i + DataLen + messageLen {
            break
        }
        data := buffer[i+DataLen:i+DataLen+messageLen]
        readerChannel <- data

        i += DataLen + messageLen - 1
    }

    if i == length {
        return make([]byte, 0)
    }
    return buffer[i:]
}

//字节转换成整形
func BytesToInt(b []byte) int {
    bytesBuffer := bytes.NewBuffer(b)

    var x int32
    binary.Read(bytesBuffer, binary.BigEndian, &x)

    return int(x)
}

//处理客户端请求
func HandleConnection(conn net.Conn) {
    defer func() {
        fmt.Println(conn.RemoteAddr())
        conn.Close()
    }()
    tempBuffer := make([]byte, 0)
    readerChannel := make(chan []byte, 16)
    //fmt.Println(conn.RemoteAddr())
    go reader(readerChannel)

    buffer := make([]byte, 1024)
    for {
        n, err := conn.Read(buffer)
        if err != nil {
            return
        }
        tempBuffer = Unpack(append(tempBuffer, buffer[:n]...), readerChannel)
    }
}

func reader(readerChannel chan []byte) {
    for {
        select {
        case data := <- readerChannel:
            //fmt.Println(string(data))
            data = data
        }
    }
}

这是几个用到的常量。

const MaxWorkers = 100000

const MaxQueue = 3000

const DataLen = 4

文章和实现参考了用go一分钟处理百万请求

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,922评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,591评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,546评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,467评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,553评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,580评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,588评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,334评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,780评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,092评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,270评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,925评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,573评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,194评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,437评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,154评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,127评论 2 352