设计目标:
- 用 DAG(有向无环图) 描述任务依赖
- goroutine + channel 并发调度
- YAML / JSON / DSL 等流程定义方式
- 插件化持久层(内存 / MySQL / Redis)
- 支持 重试 / 超时 / 补偿
项目骨架:

image.png
DAG实现:
package scheduler
import "fmt"
type DAG struct {
adj map[string][]string // 邻接表
indeg map[string]int //入度
}
func NewDAG() *DAG { return &DAG{adj: make(map[string][]string), indeg: make(map[string]int)} }
func (d *DAG) AddEdge(from, to string) {
// 自动补全缺失顶点
d.AddVertex(from)
d.AddVertex(to)
d.adj[from] = append(d.adj[from], to)
d.indeg[to]++
}
// 添加顶点(如果已存在则忽略)
func (g *DAG) AddVertex(v string) {
if _, ok := g.adj[v]; !ok {
g.adj[v] = nil
}
if _, ok := g.indeg[v]; !ok {
g.indeg[v] = 0
}
}
// 拓扑排序(Kahn/BFS 法),O(V+E) 时间复杂度,O(V) 空间复杂度。
func (d *DAG) Topological() ([]string, error) {
q := []string{}
//统计每个顶点的入度。把入度 0 的顶点入队。
for id, d := range d.indeg {
if d == 0 {
q = append(q, id)
}
}
order := []string{}
//每弹出一个顶点,将其邻接顶点的入度减 1;若减到 0 则入队。
for len(q) > 0 {
cur := q[0]
q = q[1:]
order = append(order, cur)
for _, nxt := range d.adj[cur] {
d.indeg[nxt]--
if d.indeg[nxt] == 0 {
q = append(q, nxt)
}
}
}
//若最终弹出的顶点数 不等于 总顶点数,说明有环。
if len(order) != len(d.indeg) {
return nil, fmt.Errorf("cycle detected")
}
return order, nil
}
adj : 邻接表存储每个节点的下游节点,
indeg : 定义每个节点的入度。
有向无环图是否有环路的检测方法:
- 拓扑排序(Kahn/BFS 法)
- DFS+三色标记(Tarjan/DFS 法)
拓扑排序(Kahn/BFS 法)的思路
- 统计每个顶点的入度。
- 把入度 0 的顶点入队。
- 每弹出一个顶点,将其邻接顶点的入度减 1;若减到 0 则入队。
- 若最终弹出的顶点数 < 总顶点数,说明有环。
Task & WorkFolw 定义:
type Task struct {
ID string `json:"id"`
Action string `json:"action"` // 业务函数名
Params map[string]interface{} `json:"params"`
Deps []string `json:"deps"` //依赖任务, 只有这些依赖任务全部完成后,才能执行当前任务
Timeout int `json:"timeout"` // 秒
Retry int `json:"retry"`
}
type Workflow struct {
ID string `json:"id"`
Name string `json:"name"`
Tasks []Task `json:"tasks"`
}
Deps : 用于描述当前依赖的任务, 只有这些依赖任务全部完成后,才能执行当前任务
scheduler 调度器:
scheduler ,
package scheduler
import (
"context"
"go-lab/app/go_dag/model"
"go-lab/app/go_dag/store"
"sync"
"time"
)
type Scheduler struct {
handlers map[string]TaskHandler
store store.Store
}
func NewScheduler(st store.Store) *Scheduler {
return &Scheduler{
handlers: make(map[string]TaskHandler, 0),
store: st,
}
}
// 无状态的任务处理器
type TaskHandler func(ctx context.Context, params map[string]interface{}) error
func (s *Scheduler) Register(name string, h TaskHandler) {
if s.handlers == nil {
s.handlers = make(map[string]TaskHandler)
}
s.handlers[name] = h
}
func (s *Scheduler) Run(ctx context.Context, wf *model.Workflow) error {
//基于workflow 中的描述信息, 构建 DAG, 使用DAG来描述任务依赖
d := NewDAG()
taskMap := make(map[string]*model.Task)
for _, t := range wf.Tasks {
taskMap[t.ID] = &t
d.AddVertex(t.ID)
for _, dep := range t.Deps {
d.AddEdge(dep, t.ID)
}
}
// 校验是否有环
order, err := d.Topological()
if err != nil {
return err
}
// 剩余入度 & 完成标记
remain := make(map[string]int)
for _, id := range order {
remain[id] = d.indeg[id]
}
done := make(chan string, len(order)) // 带缓冲,防止阻塞
// 基于等待组和信号量控制, 当前节点任务执行前, 所有依赖的节点任务已执行完毕
var wg sync.WaitGroup
cond := sync.NewCond(&sync.Mutex{})
// 1. 先把入度为 0 的任务放出去
for _, id := range order {
if remain[id] == 0 {
wg.Add(1)
go func(tid string) {
defer wg.Done()
s.runTask(ctx, taskMap[tid])
done <- tid
}(id)
}
}
// 2. 监听完成事件,唤醒下游
go func() {
for fin := range done {
cond.L.Lock()
for _, nxt := range d.adj[fin] {
remain[nxt]--
if remain[nxt] == 0 {
wg.Add(1)
go func(tid string) {
defer wg.Done()
s.runTask(ctx, taskMap[tid])
done <- tid
}(nxt)
}
}
cond.L.Unlock()
}
}()
wg.Wait()
close(done)
return nil
}
func (s *Scheduler) runTask(ctx context.Context, t *model.Task) {
for attempt := 0; attempt <= t.Retry; attempt++ {
c, cancel := context.WithTimeout(ctx, time.Duration(t.Timeout)*time.Second)
err := s.handlers[t.Action](c, t.Params)
cancel()
if err == nil {
break
}
if attempt == t.Retry {
// 补偿逻辑
}
}
}
测试:
1: 用yaml 定义workflow :
# workflow.yaml
id: demo
name: 用户注册
tasks:
- id: validate
action: validateUser
deps: []
timeout: 5
- id: saveUser
action: saveUser
deps: [validate]
timeout: 10
- id: sendEmail
action: sendEmail
deps: [saveUser]
timeout: 3
retry: 2
2: 加载workflow 并执行:
package main
import (
"context"
"fmt"
"go-lab/app/go_dag/model"
"go-lab/app/go_dag/scheduler"
"go-lab/app/go_dag/store"
"gopkg.in/yaml.v2"
"os"
)
func main() {
// 1. 加载流程
data, _ := os.ReadFile("./app/go_dag/example/workflow.yaml")
var wf model.Workflow
_ = yaml.Unmarshal(data, &wf)
// 2. 注册任务实现
s := scheduler.NewScheduler(store.NewMemStore())
s.Register("validateUser", validateUser)
s.Register("saveUser", saveUser)
s.Register("sendEmail", sendEmail)
// 3. 运行
s.Run(context.Background(), &wf)
}
func validateUser(ctx context.Context, p map[string]interface{}) error {
// TODO: 参数校验
fmt.Println("do task validateUser")
return nil
}
func saveUser(ctx context.Context, p map[string]interface{}) error {
// TODO: 写库
fmt.Println("do task saveUser")
return nil
}
func sendEmail(ctx context.Context, p map[string]interface{}) error {
// TODO: 发邮件
fmt.Println("do task sendEmail")
return nil
}
3: 执行结果

image.png