基于DAG(有向无环图)实现简单的流程引擎

设计目标:

  • 用 DAG(有向无环图) 描述任务依赖
  • goroutine + channel 并发调度
  • YAML / JSON / DSL 等流程定义方式
  • 插件化持久层(内存 / MySQL / Redis)
  • 支持 重试 / 超时 / 补偿

项目骨架:

image.png

DAG实现:

package scheduler

import "fmt"

type DAG struct {
    adj   map[string][]string // 邻接表
    indeg map[string]int      //入度
}

func NewDAG() *DAG { return &DAG{adj: make(map[string][]string), indeg: make(map[string]int)} }

func (d *DAG) AddEdge(from, to string) {

    // 自动补全缺失顶点
    d.AddVertex(from)
    d.AddVertex(to)

    d.adj[from] = append(d.adj[from], to)
    d.indeg[to]++
}

// 添加顶点(如果已存在则忽略)
func (g *DAG) AddVertex(v string) {
    if _, ok := g.adj[v]; !ok {
        g.adj[v] = nil
    }
    if _, ok := g.indeg[v]; !ok {
        g.indeg[v] = 0
    }
}

// 拓扑排序(Kahn/BFS 法),O(V+E) 时间复杂度,O(V) 空间复杂度。
func (d *DAG) Topological() ([]string, error) {
    q := []string{}
    //统计每个顶点的入度。把入度 0 的顶点入队。
    for id, d := range d.indeg {
        if d == 0 {
            q = append(q, id)
        }
    }
    order := []string{}

    //每弹出一个顶点,将其邻接顶点的入度减 1;若减到 0 则入队。
    for len(q) > 0 {
        cur := q[0]
        q = q[1:]
        order = append(order, cur)
        for _, nxt := range d.adj[cur] {
            d.indeg[nxt]--
            if d.indeg[nxt] == 0 {
                q = append(q, nxt)
            }
        }
    }
    //若最终弹出的顶点数 不等于 总顶点数,说明有环。
    if len(order) != len(d.indeg) {
        return nil, fmt.Errorf("cycle detected")
    }
    return order, nil
}

adj : 邻接表存储每个节点的下游节点,
indeg : 定义每个节点的入度。

有向无环图是否有环路的检测方法:

  • 拓扑排序(Kahn/BFS 法)
  • DFS+三色标记(Tarjan/DFS 法)

拓扑排序(Kahn/BFS 法)的思路

  1. 统计每个顶点的入度。
  2. 把入度 0 的顶点入队。
  3. 每弹出一个顶点,将其邻接顶点的入度减 1;若减到 0 则入队。
  4. 若最终弹出的顶点数 < 总顶点数,说明有环。

Task & WorkFolw 定义:

type Task struct {
    ID      string                 `json:"id"`
    Action  string                 `json:"action"` // 业务函数名
    Params  map[string]interface{} `json:"params"`
    Deps    []string               `json:"deps"`    //依赖任务, 只有这些依赖任务全部完成后,才能执行当前任务
    Timeout int                    `json:"timeout"` // 秒
    Retry   int                    `json:"retry"`
}

type Workflow struct {
    ID    string `json:"id"`
    Name  string `json:"name"`
    Tasks []Task `json:"tasks"`
}

Deps : 用于描述当前依赖的任务, 只有这些依赖任务全部完成后,才能执行当前任务

scheduler 调度器:

scheduler ,

package scheduler

import (
    "context"
    "go-lab/app/go_dag/model"
    "go-lab/app/go_dag/store"
    "sync"
    "time"
)

type Scheduler struct {
    handlers map[string]TaskHandler
    store    store.Store
}

func NewScheduler(st store.Store) *Scheduler {
    return &Scheduler{
        handlers: make(map[string]TaskHandler, 0),
        store:    st,
    }
}

// 无状态的任务处理器
type TaskHandler func(ctx context.Context, params map[string]interface{}) error

func (s *Scheduler) Register(name string, h TaskHandler) {
    if s.handlers == nil {
        s.handlers = make(map[string]TaskHandler)
    }
    s.handlers[name] = h
}

func (s *Scheduler) Run(ctx context.Context, wf *model.Workflow) error {
    
 //基于workflow 中的描述信息, 构建 DAG, 使用DAG来描述任务依赖
d := NewDAG()
    taskMap := make(map[string]*model.Task)
    for _, t := range wf.Tasks {
        taskMap[t.ID] = &t
        d.AddVertex(t.ID)
        for _, dep := range t.Deps {
            d.AddEdge(dep, t.ID)
        }
    }
          // 校验是否有环
    order, err := d.Topological()
    if err != nil {
        return err
    }

    // 剩余入度 & 完成标记
    remain := make(map[string]int)
    for _, id := range order {
        remain[id] = d.indeg[id]
    }
    done := make(chan string, len(order)) // 带缓冲,防止阻塞

        // 基于等待组和信号量控制, 当前节点任务执行前, 所有依赖的节点任务已执行完毕
    var wg sync.WaitGroup
    cond := sync.NewCond(&sync.Mutex{})

    // 1. 先把入度为 0 的任务放出去
    for _, id := range order {
        if remain[id] == 0 {
            wg.Add(1)
            go func(tid string) {
                defer wg.Done()
                s.runTask(ctx, taskMap[tid])
                done <- tid
            }(id)
        }
    }

    // 2. 监听完成事件,唤醒下游
    go func() {
        for fin := range done {
            cond.L.Lock()
            for _, nxt := range d.adj[fin] {
                remain[nxt]--
                if remain[nxt] == 0 {
                    wg.Add(1)
                    go func(tid string) {
                        defer wg.Done()
                        s.runTask(ctx, taskMap[tid])
                        done <- tid
                    }(nxt)
                }
            }
            cond.L.Unlock()
        }
    }()

    wg.Wait()
    close(done)
    return nil
}

func (s *Scheduler) runTask(ctx context.Context, t *model.Task) {
    for attempt := 0; attempt <= t.Retry; attempt++ {
        c, cancel := context.WithTimeout(ctx, time.Duration(t.Timeout)*time.Second)
        err := s.handlers[t.Action](c, t.Params)
        cancel()
        if err == nil {
            break
        }
        if attempt == t.Retry {
            // 补偿逻辑
        }
    }
}


测试:

1: 用yaml 定义workflow :

# workflow.yaml
id:   demo
name: 用户注册
tasks:
  - id: validate
    action: validateUser
    deps: []
    timeout: 5
  - id: saveUser
    action: saveUser
    deps: [validate]
    timeout: 10
  - id: sendEmail
    action: sendEmail
    deps: [saveUser]
    timeout: 3
    retry: 2

2: 加载workflow 并执行:

package main

import (
    "context"
    "fmt"
    "go-lab/app/go_dag/model"
    "go-lab/app/go_dag/scheduler"
    "go-lab/app/go_dag/store"
    "gopkg.in/yaml.v2"
    "os"
)

func main() {
    // 1. 加载流程
    data, _ := os.ReadFile("./app/go_dag/example/workflow.yaml")
    var wf model.Workflow
    _ = yaml.Unmarshal(data, &wf)

    // 2. 注册任务实现
    s := scheduler.NewScheduler(store.NewMemStore())
    s.Register("validateUser", validateUser)
    s.Register("saveUser", saveUser)
    s.Register("sendEmail", sendEmail)

    // 3. 运行
    s.Run(context.Background(), &wf)

}

func validateUser(ctx context.Context, p map[string]interface{}) error {
    // TODO: 参数校验

    fmt.Println("do task validateUser")
    return nil
}
func saveUser(ctx context.Context, p map[string]interface{}) error {
    // TODO: 写库

    fmt.Println("do task saveUser")
    return nil
}
func sendEmail(ctx context.Context, p map[string]interface{}) error {
    // TODO: 发邮件

    fmt.Println("do task sendEmail")
    return nil
}

3: 执行结果


image.png
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容