19-定时任务库“robfig/cron”核心源码解读

定时任务是项目中一个比较常见的情景。那么,今天我们就来介绍一个github上还不错的定时任务库“github.com/robfig/cron.git/v3”。

拉取库文件:

  go get github.com/robfig/cron/v3@v3.0.0

demo示例:

package main

import (
    "fmt"
    "time"
    "github.com/robfig/cron/v3"
)

//定时任务
func timedTask() {
    fmt.Printf("执行定时任务: %s \n", time.Now().Format("2006-01-02 15:04:05"))
}

func main() {
    //初始化一个cron对象
    c := cron.New()

    //方法一:通过AddFunc注册,任务调度
    spec := "50,51 17 * * *"  //每天17点50分,51分执行
    //spec := "@every 3s"   //每3秒执行一次(秒数可以超过60s)
    //spec := "CRON_TZ=Asia/Tokyo 30 04 * * *"
    
    //参数一:调度的时间策略,参数二:到时间后执行的方法。
    enterId, err := c.AddFunc(spec, timedTask)
    if err != nil {
        panic(err)
    }
    fmt.Printf("任务id是 %d \n", enterId)

    //启动
    c.Start()

    //用于阻塞 后面可以使用 select {} 阻塞
    time.Sleep(time.Hour * 9)

    //关闭定时任务(其实不关闭也可以,主进程直接结束了, 内部的goroutine协程也会自动结束)
    c.Stop()
}
 //方法二:通过AddJob注册
   type myType int
   func (c myType) Run() {
      fmt.Println("myType 实现了 Run方法")
      return
   }
   
   var dataNew myType = 10
   c.AddJob("@every 5s", dataNew)
   //调用方法AddJob(spec string, cmd Job)也可以实现AddFunc注册的功能,
   //Job是interface,需要入参类型实现方法:Run()。实际上,
   //方法AddFunc内部将参数cmd 进行了包装(wrapper),然后也是调用方法AddJob进行注册。

源码分析:

  • 基本数据结构体:(cron.go)
  //Cron数据结构
  type Cron struct {
    entries   []*Entry  //调度执行实体列表(或job的指针对象)
    chain     Chain     //chain用来定义Entry里的warppedJob使用的逻辑
    stop      chan struct{}  //停止所有调度任务
    add       chan *Entry    //添加一个调度任务
    remove    chan EntryID   //移除一个调度任务
    snapshot  chan chan []Entry  //运行中的调度任务
    running   bool  //代表是否已经在执行,用于操作整个cron对象只启动一次
    logger    Logger  //记录日志信息
    runningMu sync.Mutex //协程锁,确保运行数据安全,比如增加或移除entry
    location  *time.Location   // 时区
    parser    ScheduleParser  //对时间格式进行解析
    nextID    EntryID   //下一个任务的id
    jobWaiter sync.WaitGroup //run task时进行add(1),结束时done(),以此保证所有job都能退出
  }
  
  // Entry 数据结构,每一个被调度实体一个
  type Entry struct {
    // 唯一id,用于查询和删除,默认是自增的
       ID EntryID 
    //本Entry的调度时间,不是绝对时间,在生成entry时会计算出来
       Schedule Schedule
    // 本entry下次需要执行的绝对时间,会一直被更新
    // 被封装的含义是Job可以多层嵌套,可以实现基于需要执行Job的额外处理
    // 比如抓取Job异常、如果Job没有返回下一个时间点的Job是还是继续执行还是delay
       Next time.Time 
    //上一次被执行时间,主要用来查询
       Prev time.Time 
    //WrappedJob 是真实执行的Job实体(执行的任务)
       WrappedJob Job
    //Job主要给用户查询
       Job Job
  }
  • 核心函数run() :
// run the scheduler.. this is private just due to the need to synchronize
// access to the 'running' state variable.
func (c *Cron) run() {
    c.logger.Info("start")
  
    // Figure out the next activation times for each entry.
    //获取当前时间
    now := c.now()
    //循环调入任务,计算下一次任务的执行时间
    for _, entry := range c.entries {
        entry.Next = entry.Schedule.Next(now)
        c.logger.Info("schedule", "now", now, "entry", entry.ID, "next", entry.Next)
    }
      
      //第一层死循环,无限循环
    for {
        // Determine the next entry to run.
        // 按时间先后排队调度任务
        sort.Sort(byTime(c.entries))
  
        var timer *time.Timer
        if len(c.entries) == 0 || c.entries[0].Next.IsZero() {
            // If there are no entries yet, just sleep - it still handles new entries
            // and stop requests.
            // 如果cron启动后 还没有 调度信息的话 就生成一个sleep10W小时的 chan Time,
            //用于阻塞下面的 select{} ,因为`select`是多路复用,
            //其他channel能返回数据时,select就回执行不会阻塞。
            // 所以当没有任务时,启动Start()程序 就会被这个阻塞
            timer = time.NewTimer(100000 * time.Hour)
        } else {
            //如果有调度信息,就 sleep 调度任务中第一个的 循环时间 
            timer = time.NewTimer(c.entries[0].Next.Sub(now))
        }
  
          // 第二层死循环,内部使用select{}阻塞
        for {
            select {
            case now = <-timer.C:  //上一步中的 timer sleep时间如果到了就执行
                now = now.In(c.location)
                c.logger.Info("wake", "now", now)
  
                // Run every entry whose next time was less than now
                for _, e := range c.entries {
                    if e.Next.After(now) || e.Next.IsZero() {
                        break
                    }
                    c.startJob(e.WrappedJob)
                    e.Prev = e.Next
                    e.Next = e.Schedule.Next(now)
                    c.logger.Info("run", "now", now, "entry", e.ID, "next", e.Next)
                }
  
            case newEntry := <-c.add: //向Cron中添加了 一个调度任务就会执行
                timer.Stop()
                now = c.now()
                newEntry.Next = newEntry.Schedule.Next(now)
                c.entries = append(c.entries, newEntry)
                c.logger.Info("added", "now", now, "entry", newEntry.ID, "next", newEntry.Next)
  
            case replyChan := <-c.snapshot:
                replyChan <- c.entrySnapshot()
                continue
                   
            case <-c.stop:   // 停止定时任务
                timer.Stop()
                c.logger.Info("stop")
                return
  
            case id := <-c.remove: // 移除任务
                timer.Stop()
                now = c.now()
                c.removeEntry(id)
                c.logger.Info("removed", "entry", id)
            }
               //当以上任意一个channel满足时,就会结束内层循环 重复上一层步骤
            break
        }
    }
}
  
  • 时间书写格式:
CRON Expression Format (CRON表达式格式):
Field name Mandatory? Allowed values Allowed special characters
Minutes Yes 0-59 * / , -
Hours Yes 0-23 * / , -
Day of month Yes 1-31 * / , - ?
Month Yes 1-12 or JAN-DEC * / , -
Day of week Yes 0-6 or SUN-SAT * / , - ?
Predefined schedules( 预定义的时间表):
Entry(输入) Description(描述) Equivalent To(等于)
@yearly (or @annually) Run once a year, midnight, Jan. 1st 0 0 1 1 *
@monthly Run once a month, midnight, first of month 0 0 1 * *
@weekly Run once a week, midnight between Sat/Sun 0 0 * * 0
@daily (or @midnight) Run once a day, midnight 0 0 * * *
@hourly Run once an hour, beginning of hour 0 * * * *

欠缺的地方:

  • 不支持持久化,重启一下服务,调度任务信息就没了,需要自己存储调度信息。
  • 不支持一次定时,若想实现此功能,调用一次之后再调用移除可以实现。
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,132评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,802评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,566评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,858评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,867评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,695评论 1 282
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,064评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,705评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,915评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,677评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,796评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,432评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,041评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,992评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,223评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,185评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,535评论 2 343

推荐阅读更多精彩内容