Go定时任务源码 - robfig/cron

介绍

robfig/cron是Go语言实现的开源定时任务调度框架,核心代码是巧妙的使用chan + select + for实现了一个轻量级调度协程,不但语法简洁,而且具有很好的性能。

设计

任务抽象(业务隔离):任务抽象成一个Job接口,业务逻辑类只需实现该接口

type Job interface {
  Run()
}

计划接口:通过当前时间计算任务的下次执行执行时间,具体实现类可以根据实际需求实现

type Schedule interface {
    Next(time.Time) time.Time
}

定时任务对象:保存执行的任务Job、计算执行时间

type Entry struct {
    ID       EntryID   // id
    Schedule Schedule  // 计划
    Next     time.Time // 下次执行时间
    Job      Job       // 任务
}

任务调度管理:保存定时任务对象(Entry),调度任务执行,提供新增、删除接口(涉及关联资源竞争)

// 任务管理类
type Cron struct {
    nextID  int64        // 生成entry自增ID
    entries []*Entry     // 保存Entry
    add     chan *Entry  // 添加
    remove  chan EntryID // 删除
}
// 删除
func (c *Cron) Remove(id EntryID) { 
    c.remove <- id
}
// 新增
func (c *Cron) Add(spec string, cmd Job) EntryID  { 
    entry := &Entry{
        ID:         EntryID(atomic.AddInt64(&c.nextID, 1)),
        Schedule:   ParseStandard(spec),
        Job:        cmd,
    }
    c.add <- entry
    return entry.ID
}

核心调度:计算下次执行时间 -> 排序 -> 取最早执行数据 -> timer 等待,因为只有一个协程在执行这个run的调度,所以不存在资源竞争,不需要加锁,另外考虑到执行任务可能涉及阻塞,例如:IO操作,所以一般startJob方法会开启协程执行

func (c *Cron) run() {
    now := time.Now()
    for _, entry := range c.entries {
        entry.Next = entry.Schedule.Next(now) // 计算下次执行时间
    }
    for {
        sort.Sort(byTime(c.entries)) // 时间排序
        timer := time.NewTimer(c.entries[0].Next.Sub(now))
        select {
        case now = <-timer.C:
            for _, e := range c.entries {
                if e.Next.After(now) || e.Next.IsZero() {
                    break
                }
                c.startJob(e.Job) // 开协程执行
                e.Next = e.Schedule.Next(now) // 计算下次执行时间
            }
        case newEntry := <-c.add: // 新增
            timer.Stop()
            newEntry.Next = newEntry.Schedule.Next(now)
            c.entries = append(c.entries, newEntry)
        }
    ...
    }
}
// 执行任务
func (c *Cron) startJob(j Job) {
    go func() {
        j.Run()
    }()
}

启动时会开启唯一协程执行run方法,计算任务执行时间,执行,任务管理等

func New() *Cron {
    c := &Cron{
        entries: nil,
        add:     make(chan *Entry),
        remove:  make(chan EntryID),
    }
    return c
}
func (c *Cron) Start() {
    go c.run()
}

总结

  1. 共享资源(定时任务)的管理和调度由唯一协程管理
  2. 通过for + select + channel来循环计算执行时间,监听任务到期、增删事件
  3. 执行任务会新启协程执行,不阻塞调度
  4. 采用扇入/扇出原理,多协程添加、增删任务调度协程(Fan In),调度启动新协程执行任务(Fan Out)
  5. 调度协程使用的是CSP并发模型思想

我的博客:https://itart.cn
原文地址:https://itart.cn/blogs/2022/explore/cron-source-code.html

©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容