quartz源码阅读

前面的话

这里只对quartz的源码做一个整体的梳理,关于quartz的整体结构,百度Google之,一堆一堆的。

具体阅读

quartz中主要围绕3个东东搞各种逻辑。分别是调度器(Scheduler),触发器(trigger)和任务(job)。调度器去获取触发器,触发器指定任务的调度时间,调度策略,调度状态,优先级,开始时间,结束时间等信息。任务就是具体的业务逻辑实现。

一个栗子进入代码

        SchedulerFactory factory = new StdSchedulerFactory("test_quartz.properties");
        Scheduler scheduler = factory.getScheduler();
        scheduler.start();
        Trigger t = newTrigger().withIdentity("t1","g1").startAt(new Date(1466746025000l)).withSchedule(simpleSchedule().withMisfireHandlingInstructionNextWithRemainingCount().withRepeatCount(0)).build();
        JobDetail job = newJob(TestJob.class).withIdentity("myJob1", "g1").build();
        scheduler.scheduleJob(job,t);
    }
public class TestJob implements Job {
    @Override
    public void execute(JobExecutionContext context) throws JobExecutionException {
        try {
            Thread.sleep(20000);
            System.out.println(context.getTrigger().getKey()+"执行成功!!!");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

上面两段代码就是一个简单的任务的写法。
主要过程如下:
1、首先通过调度器工厂获取一个调度器。启动调度器。
2、定义触发器。
3、定义任务。
4、通过调度器将触发器和任务关联起来。
首先来看下调度器的初始化。
调度器工厂初始化主要是读取配置信息。通过getScheduler方法才是真正的初始化scheduler,里边主要是通过配置信息组装scheduler。这里不是重点,一笔带过。【注:在调度器组装的时候,顺便启动了任务的执行线程
qs = new QuartzScheduler(rsrcs, idleWaitTime, dbFailureRetry);

public QuartzScheduler(QuartzSchedulerResources resources, long idleWaitTime, @Deprecated long dbRetryInterval)
        throws SchedulerException {
        this.resources = resources;
        if (resources.getJobStore() instanceof JobListener) {
            addInternalJobListener((JobListener)resources.getJobStore());
        }

        this.schedThread = new QuartzSchedulerThread(this, resources);
        ThreadExecutor schedThreadExecutor = resources.getThreadExecutor();
        schedThreadExecutor.execute(this.schedThread);

,只是在线程启动后一直等待,知道调度器调用start方法

while (paused && !halted.get()) {
                      try {
                          // wait until togglePause(false) is called...
                          sigLock.wait(1000L);
                      } catch (InterruptedException ignore) {
                      }

下面试调度器的启动。

调度器启动过程

quartz支持集群模式下的任务调度。任务持久化采用DB的方式。
这里主要涉及集群模式下的任务执行过程。
启动过程代码如下:

 public void start() throws SchedulerException {
        if (shuttingDown|| closed) {
            throw new SchedulerException(
                    "The Scheduler cannot be restarted after shutdown() has been called.");
        }
        notifySchedulerListenersStarting();
        if (initialStart == null) {
            initialStart = new Date();
    //调度器第一次启动
            this.resources.getJobStore().schedulerStarted();            
            startPlugins();
        } else {
          
            resources.getJobStore().schedulerResumed();
        }
    //将执行线程唤醒。用于获取触发器,触发任务
        schedThread.togglePause(false);
        getLog().info(
                "Scheduler " + resources.getUniqueIdentifier() + " started.");        
        notifySchedulerListenersStarted();
    }
public void schedulerStarted() throws SchedulerException {

//如果是集群,启动集群的管理线程,定时检查集群的健康性。
        if (isClustered()) {
            clusterManagementThread = new ClusterManager();
            if(initializersLoader != null)
                clusterManagementThread.setContextClassLoader(initializersLoader);
            clusterManagementThread.initialize();
        } else {
            try {
  //非集群,则恢复调度器宕机前的任务
                recoverJobs();
            } catch (SchedulerException se) {
                throw new SchedulerConfigException(
                        "Failure occured during job recovery.", se);
            }
        }
//起线程,用于检查是否有任务错过执行时间,若有,则根据不同的策略修改不同的nextfiretime值,以便于工作线程去选择trigger。
        misfireHandler = new MisfireHandler();
        if(initializersLoader != null)
            misfireHandler.setContextClassLoader(initializersLoader);
        misfireHandler.initialize();
        schedulerRunning = true;
        
        getLog().debug("JobStore background threads started (as scheduler was started).");
    }

下面深入对三大线程做讲解。QuartzSchedulerThread,ClusterManager和MisfireHandler。

QuartzSchedulerThread

这个线程是quartz的主要线程,负责调度的。看下代码:
run方法很长,这里选取主要的代码。

  public void run() {
        int acquiresFailed = 0;
        while (!halted.get()) {
            try {
                synchronized (sigLock) {
                    while (paused && !halted.get()) {
                        try {
                           //scheduler调用start方法前,在此处循环,直到start方法里调用了togglePause方法。
                            sigLock.wait(1000L);
                        } catch (InterruptedException ignore) {
                        }
。。。
                int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
                if(availThreadCount > 0) { // will always be true, due to semantics of blockForAvailableThreads...
                    try {
                      //获取当前可以被触发的trigger
                        triggers = qsRsrcs.getJobStore().acquireNextTriggers(
                                now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
                        acquiresFailed = 0;
                     。。。

                        List<TriggerFiredResult> bndles = new ArrayList<TriggerFiredResult>();
                        if(goAhead) {
                            try {
                              //触发trigger,主要是修改qrtz_trigger表中trigger的状态从acquired状态变成WATTING或者complete状态。并计算下一次执行时间,等待下一次被选中。同时修改QRTZ_FIRED_TRIGGERS中trigger状态为executing状态。
                                List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);
                                if(res != null)
                                    bndles = res;
。。。
                            JobRunShell shell = null;
                            try {
                                shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
                                shell.initialize(qs);
                            } catch (SchedulerException se) {
                                qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
                                continue;
                            }
                            //真正的到了调用job的execute的地方了,该方法执行完成之后,本次调度就真正完成了。
                            if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
                                // this case should never happen, as it is indicative of the
                                // scheduler being shutdown or a bug in the thread pool or
                                // a thread pool being used concurrently - which the docs
                                // say not to do...
                                getLog().error("ThreadPool.runInThread() return false!");
                                qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
                            }

                        }

                        continue; // while (!halted)
 //后面就是该线程等待一段时间,用于其他节点来调度任务。
    }

总结起来,上面代码的逻辑如下:
1、启动时,该线程一直都在等待,知道有调用scheudler的start方法,开始唤醒该线程。
2、查看当前任务的处理线程池里空闲线程的个数,然后去qrtz_triggers表中获取可以处理的trigger,并将trigger的状态改为acquired,同时插入表qrtz_fired_triggers,此时qrtz_fired_triggers表中trigger的状态也为acquired。
3、获取到待执行的trigger,由于取的是时间窗里的trigger,所以,从待执行的trigger列表中取第一个trigger(trigger列表是按照next_fire_time升序排列),与当前时间比较,如果大于2s,则等待。
4、等到第一个trigger的任务到了,则去qrtz_triggers表中再次确认获取到的trigger的状态是否为aquired,若是,则修改qrtz_fired_triggers状态为executing。同时,qrtz_triggers中的状态在本次调度时已经走到尽头,可以等待下一次的调度了。即,计算下一次的调度时间,将并将任务状态改为watting状态。若计算得到的下一次调度时间为null,则表明该任务已经执行完成。将任务改为complete状态。返回待本次调度的trigger。
5、循环trigger,获取任务执行线程,执行任务的execute方法。
6、改调度线程wait一段时间,等待下一次获取trigger,调度。
接下来看下真正调度的线程JobRunShell,同样,很长的run方法,这里只摘取部分代码:

 public void run() {
        qs.addInternalSchedulerListener(this);
        try {
            OperableTrigger trigger = (OperableTrigger) jec.getTrigger();
            JobDetail jobDetail = jec.getJobDetail();
            do {
                JobExecutionException jobExEx = null;
                Job job = jec.getJobInstance();
......
                long startTime = System.currentTimeMillis();
                long endTime = startTime;
                try {
                    job.execute(jec);
                    endTime = System.currentTimeMillis();
                } catch (JobExecutionException jee) {
                    endTime = System.currentTimeMillis();
                    jobExEx = jee;
                    getLog().info("Job " + jobDetail.getKey() +
                            " threw a JobExecutionException: ", jobExEx);
                } catch (Throwable e) {
                    endTime = System.currentTimeMillis();
                    getLog().error("Job " + jobDetail.getKey() +
                            " threw an unhandled Exception: ", e);
                    SchedulerException se = new SchedulerException(
                            "Job threw an unhandled exception.", e);
                    qs.notifySchedulerListenersError("Job ("
                            + jec.getJobDetail().getKey()
                            + " threw an exception.", se);
                    jobExEx = new JobExecutionException(se, false);
                }

                jec.setJobRunTime(endTime - startTime);
......
                CompletedExecutionInstruction instCode = CompletedExecutionInstruction.NOOP;
                try {
                    instCode = trigger.executionComplete(jec, jobExEx);
                } catch (Exception e) {
                    // If this happens, there's a bug in the trigger...
                    SchedulerException se = new SchedulerException(
                            "Trigger threw an unhandled exception.", e);
                    qs.notifySchedulerListenersError(
                            "Please report this error to the Quartz developers.",
                            se);
                }
                if (instCode == CompletedExecutionInstruction.RE_EXECUTE_JOB) {
                    jec.incrementRefireCount();
                    try {
                        complete(false);
                    } catch (SchedulerException se) {
                        qs.notifySchedulerListenersError("Error executing Job ("
                                + jec.getJobDetail().getKey()
                                + ": couldn't finalize execution.", se);
                    }
                    continue;
                }

                try {
                    complete(true);
                } catch (SchedulerException se) {
                    qs.notifySchedulerListenersError("Error executing Job ("
                            + jec.getJobDetail().getKey()
                            + ": couldn't finalize execution.", se);
                    continue;
                }

                qs.notifyJobStoreJobComplete(trigger, jobDetail, instCode);
                break;
            } while (true);

        } finally {
            qs.removeInternalSchedulerListener(this);
        }
    }

上述代码的逻辑很简单,就是获取job并执行job的execute方法。执行完成之后,通过不同的返回码,进行不同的数据库操作。 qs.notifyJobStoreJobComplete(trigger, jobDetail, instCode);这句话就是通过不同的返回值做不同的数据库操作。主要是修改qrtz_triggers里的trigger状态及某些场景下删除trigger。然后是删除qrtz_fired_triggers里的当前trigger。
到此,正常的任务调度完成了。当然其中很多步骤里都调用了SchedulerListener,TriggerListener中的一些方法,这些是quartz开放出来的定制接口,方便每步操作时,我们对任务的监控。

状态转换

接下来看misfired的进程MisfireHandler。

Misfirehandler是一个内部类。run接口 代码如下:

  public void run() {
            
            while (!shutdown) {

                long sTime = System.currentTimeMillis();

//获取misfired的job
                RecoverMisfiredJobsResult recoverMisfiredJobsResult = manage();

// 如果任务处理线程在等待下一次的扫描满足的trigger,则唤醒线程,来处理misfired的任务
                if (recoverMisfiredJobsResult.getProcessedMisfiredTriggerCount() > 0) {
                    signalSchedulingChangeImmediately(recoverMisfiredJobsResult.getEarliestNewTime());
                }

                if (!shutdown) {
                    long timeToSleep = 50l;  // At least a short pause to help balance threads
                    if (!recoverMisfiredJobsResult.hasMoreMisfiredTriggers()) {
                        timeToSleep = getMisfireThreshold() - (System.currentTimeMillis() - sTime);
                        if (timeToSleep <= 0) {
                            timeToSleep = 50l;
                        }

                        if(numFails > 0) {
                            timeToSleep = Math.max(getDbRetryInterval(), timeToSleep);
                        }
                    }
                    
                    try {
                        Thread.sleep(timeToSleep);
                    } catch (Exception ignore) {
                    }
                }//while !shutdown
            }
        }

主要逻辑在manager方法里。

 private RecoverMisfiredJobsResult manage() {
            try {
                getLog().debug("MisfireHandler: scanning for misfires...");

                RecoverMisfiredJobsResult res = doRecoverMisfires();
                numFails = 0;
                return res;
            } catch (Exception e) {
                if(numFails % 4 == 0) {
                    getLog().error(
                        "MisfireHandler: Error handling misfires: "
                                + e.getMessage(), e);
                }
                numFails++;
            }
            return RecoverMisfiredJobsResult.NO_OP;
        }

主要的方法是doRecoverMisfires()。

protected RecoverMisfiredJobsResult doRecoverMisfires() throws JobPersistenceException {
        boolean transOwner = false;
        Connection conn = getNonManagedTXConnection();
        try {
            RecoverMisfiredJobsResult result = RecoverMisfiredJobsResult.NO_OP;
            
            // Before we make the potentially expensive call to acquire the 
            // trigger lock, peek ahead to see if it is likely we would find
            // misfired triggers requiring recovery.
            int misfireCount = (getDoubleCheckLockMisfireHandler()) ?
                getDelegate().countMisfiredTriggersInState(
                    conn, STATE_WAITING, getMisfireTime()) : 
                Integer.MAX_VALUE;
            
            if (misfireCount == 0) {
                getLog().debug(
                    "Found 0 triggers that missed their scheduled fire-time.");
            } else {
                transOwner = getLockHandler().obtainLock(conn, LOCK_TRIGGER_ACCESS);
                //修改misfired的next fired time ,等待任务选取线程去调度
                result = recoverMisfiredJobs(conn, false);
            }
            
            commitConnection(conn);
            return result;

int misfireCount = (getDoubleCheckLockMisfireHandler()) ? getDelegate().countMisfiredTriggersInState( conn,STATE_WAITING,getMisfireTime()) :
获取misfired的trigger,执行的查询为select count(TRIGGER_NAME) from QRTZ_TRIGGERS where SCHED_NAME=xxx and not (MISFIRE_INSTR = -1 ) and NEXT_FIRE_TIME < 当前时间 and TRIGGER_STATE='STATE_WAITING'即选取当前调度器的misfired的策略不为-1的,且下一次执行时间小于当前时间的且状态为waiting的trigger。
result = recoverMisfiredJobs(conn, false);真正获取misfired的job的时候了。

protected RecoverMisfiredJobsResult recoverMisfiredJobs(
        Connection conn, boolean recovering)
        throws JobPersistenceException, SQLException {

        // If recovering, we want to handle all of the misfired
        // triggers right away.
        int maxMisfiresToHandleAtATime = 
            (recovering) ? -1 : getMaxMisfiresToHandleAtATime();
        
        List<TriggerKey> misfiredTriggers = new LinkedList<TriggerKey>();
        long earliestNewTime = Long.MAX_VALUE;
        // We must still look for the MISFIRED state in case triggers were left 
        // in this state when upgrading to this version that does not support it. 
        boolean hasMoreMisfiredTriggers =
            getDelegate().hasMisfiredTriggersInState(
                conn, STATE_WAITING, getMisfireTime(), 
                maxMisfiresToHandleAtATime, misfiredTriggers);

        if (hasMoreMisfiredTriggers) {
            getLog().info(
                "Handling the first " + misfiredTriggers.size() +
                " triggers that missed their scheduled fire-time.  " +
                "More misfired triggers remain to be processed.");
        } else if (misfiredTriggers.size() > 0) { 
            getLog().info(
                "Handling " + misfiredTriggers.size() + 
                " trigger(s) that missed their scheduled fire-time.");
        } else {
            getLog().debug(
                "Found 0 triggers that missed their scheduled fire-time.");
            return RecoverMisfiredJobsResult.NO_OP; 
        }

        for (TriggerKey triggerKey: misfiredTriggers) {
            
            OperableTrigger trig = 
                retrieveTrigger(conn, triggerKey);

            if (trig == null) {
                continue;
            }

            doUpdateOfMisfiredTrigger(conn, trig, false, STATE_WAITING, recovering);

            if(trig.getNextFireTime() != null && trig.getNextFireTime().getTime() < earliestNewTime)
                earliestNewTime = trig.getNextFireTime().getTime();
        }

        return new RecoverMisfiredJobsResult(
                hasMoreMisfiredTriggers, misfiredTriggers.size(), earliestNewTime);
    }

每次获取misfired的trigger有一定的数量,默认20个,超过20个,则会在下一次去获取。
处理misfired的triggerdoUpdateOfMisfiredTrigger(conn, trig, false, STATE_WAITING, recovering);

 private void doUpdateOfMisfiredTrigger(Connection conn, OperableTrigger trig, boolean forceState, String newStateIfNotComplete, boolean recovering) throws JobPersistenceException {
        Calendar cal = null;
        if (trig.getCalendarName() != null) {
            cal = retrieveCalendar(conn, trig.getCalendarName());
        }

//触发triggerlistener的misfired方法。
        schedSignaler.notifyTriggerListenersMisfired(trig);

//根据不同的misfired的策略计算next_fired_time。【比较的绕,下次详细介绍】
        trig.updateAfterMisfire(cal);

        if (trig.getNextFireTime() == null) {
//如果下一次执行的时间为空,则认为执行的任务已经完成了。直接修改状态的state_complete
            storeTrigger(conn, trig,
                null, true, STATE_COMPLETE, forceState, recovering);
            schedSignaler.notifySchedulerListenersFinalized(trig);
        } else {
//修改任务的下一次执行时间和任务状态,等待调度线程去调度
            storeTrigger(conn, trig, null, true, newStateIfNotComplete,
                    forceState, recovering);
        }
    }

执行完上面的代码,提交了数据库事物后,任务就可以被正常调度了。到这里,misfired的任务大体的也完成。
然后就是回到manager方法了。唤醒调度线程,至此,misfired的本次扫描全部完成,接下来的事情就交给QuartzSchedulerThread来处理了。
写了好几天,终于写完了两个线程的处理过程。接下来的一篇主要介绍任务节点在down机的时候的处理及不同情况下trigger的next_fire_time的计算。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,254评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,875评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,682评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,896评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,015评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,152评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,208评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,962评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,388评论 1 304
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,700评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,867评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,551评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,186评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,901评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,142评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,689评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,757评论 2 351

推荐阅读更多精彩内容

  • 概述 了解Quartz体系结构 Quartz对任务调度的领域问题进行了高度的抽象,提出了调度器、任务和触发器这3个...
    张晨辉Allen阅读 2,220评论 2 11
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,644评论 18 139
  • ** 版本:2.2.1 ** Hello world: 调度器: 任务详情:任务体实现Job接口 触发器: 执行调...
    Coselding阅读 10,146评论 12 38
  • 老李去扶贫,这是组织决定的。 清早来到车库,他犯难了,开哪辆车呢?老李有四辆车,第一辆普桑,他三十...
    日大侠阅读 232评论 0 0
  • 有一个高中同学,年薪如何如何,房子车子如何如何,这么多年偶尔给我电话,每次唠叨的事情都一样,不同的人生阶段不同的收...
    by_10阅读 362评论 1 1