基于生产者-消费者的设计。
任务数据结构 Task 自带方法 exec() 负责执行Task任务的任意函数 f(),Pool的run()开启N个消费者协程监听共享无缓冲channel,生产者协程封装数据id和函数t到数据结构Task,导入channel执行。
package main
import (
"fmt"
"time"
"math/rand"
)
type Task struct {
ProducerId int
TaskId int
f func() time.Time
}
func NewTask(id int, taskid int,f func() time.Time) *Task {
return &Task{
ProducerId: id,
TaskId: taskid,
f: f,
}
}
type Pool struct {
workerNum int
workerChan chan *Task
}
func NewPool(num int) *Pool {
return &Pool{
workerNum: num,
workerChan: make(chan *Task),
}
}
func (p *Pool) worker(id int) {
for task := range p.workerChan {
fmt.Println(task.f(),": ConsumerId:", id, "ProducerId:", task.ProducerId, "TaskId:", task.TaskId,"is done")
}
}
func (p *Pool) Run() {
for i := 0; i < p.workerNum; i++ {
go p.worker(i)
}
}
func task() time.Time {
return time.Now()
}
func main() {
p := NewPool(10)
p.Run()
for producerId :=0; producerId < 15; producerId++ {
go func(producerId int){
for i := 0; i < 50; i++ {
p.workerChan<-NewTask(producerId,rand.Int(),task)
}
}(producerId)
}
time.Sleep(3*time.Second)
}