调度执行--quartz核心线程类

QuartzSchedulerThread

调度器初始化后构造了核心调度线程,quartz整个作业获取执行均由此线程执行

调度过程:
1.等待线程池内的可用线程(此处如果线程池繁忙的话,会造成调度延迟或者失败)
2.查询本次可执行的触发器,通过now+idleWaitTime确定可以获取的Triggers的时间范围,获取的数量由Math.min(availThreadCount,qsRsrcs.getMaxBatchSize())最小值决定(此处可以通过配置优化,默认的batchSize是1)
3.告诉JobStore现在要触发作业了,并且更新了trigger的触发时间
4.循环触发此次作业,作业方法的调用是通过JobRunShell进行的,将构造出的每一个JobRunShell放入线程池等待调度

public void run(){
    boolean lastAcquireFailed=false;

    while(!halted.get()){
        try{
                 //省略代码…检查调度器是否暂停或停止
        //等待可用线程,此方法阻塞直到线程池有可用线程为止
        int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
        if(availThreadCount>0){
            //willalwaysbetrue,duetosemanticsofblockForAvailableThreads...
        
            List<OperableTrigger> triggers=null;
            
            Long now=System.currentTimeMillis();
            
            clearSignaledSchedulingChange();
            
            try{
                //查询本次执行的触发器。
                //参数(1)下一次间隔时间是now+idleWaitTime(此变量用来控制获取每一批触发器的间隔,设置过小导致频繁查询)
                //参数(2)查询数量为 可用线程数或配置的最大批 两者最小
                triggers = qsRsrcs.getJobStore().acquireNextTriggers(
                now+idleWaitTime,Math.min(availThreadCount,qsRsrcs.getMaxBatchSize()),qsRsrcs.getBatchTimeWindow());
                lastAcquireFailed = false;
            }catch(JobPersistenceExceptionjpe){
                if(!lastAcquireFailed){
                    qs.notifySchedulerListenersError(
                    "Anerroroccurredwhilescanningforthenexttriggerstofire.",
                    jpe);
                }
                lastAcquireFailed=true;
                continue;
                }catch(RuntimeExceptione){
                if(!lastAcquireFailed){
                    getLog().error("quartzSchedulerThreadLoop:RuntimeException"
                    +e.getMessage(),e);
                }
                lastAcquireFailed=true;
                continue;
            }
    
        if(triggers!=null&&!triggers.isEmpty()){
        
    //省略代码…此处检查调度器是否到执行时间,与系统时间判断,只有时间差小于2毫秒才可继续执行
        if(goAhead){
        try{
            //告诉JobStore现在要触发作业了,并且更新了trigger的触发时间
            List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);
            if(res!=null)
                bndles=res;
        }catch(SchedulerExceptionse){
            qs.notifySchedulerListenersError(
            "Anerroroccurredwhilefiringtriggers'"
            +triggers+"'",se);
            //QTZ-179:aproblemoccurredinteractingwiththetriggersfromthedb
            //wereleasethemandloopagain
            for(inti=0;i<triggers.size();i++){
            qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
            }
            continue;
        }
        
        }
    
        //循环触发此次作业
        for(inti=0;i<bndles.size();i++){
            TriggerFiredResult result=bndles.get(i);
            TriggerFiredBundle bndle=result.getTriggerFiredBundle();
            Exception exception=result.getException();
            
            if(exception instanceof RuntimeException){
                getLog().error("RuntimeExceptionwhilefiringtrigger"+triggers.get(i),exception);
                qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
                continue;
            }
        
            //it's possible to get 'null' if the triggers was paused,
            //blocked,or other similar occurrences that prevent it being
            //fired at this time...o rif the scheduler was shutdown(halted)
            if(bndle==null){
                qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
                continue;
            }
        
            JobRunShell shell=null;
            try{
                //构造可执行作业,并初始化,真正执行作业是在这个类
                shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
                shell.initialize(qs);
            }catch(SchedulerExceptionse){
                qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i),bndle.getJobDetail(),CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
                continue;
            }
            //放入线程池中 执行作业!!
            if(qsRsrcs.getThreadPool().runInThread(shell)==false){
                //thiscaseshouldneverhappen,asitisindicativeofthe
                //schedulerbeingshutdownorabuginthethreadpoolor
                //athreadpoolbeingusedconcurrently-whichthedocs
                //saynottodo...
                getLog().error("ThreadPool.runInThread()returnfalse!");
                qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i),bndle.getJobDetail(),CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
            }
    
        }
        
        continue;//while(!halted)
        }
    }else{//if(availThreadCount>0)
    //shouldneverhappen,ifthreadPool.blockForAvailableThreads()followscontract
    continue;//while(!halted)
    }
    
    longnow=System.currentTimeMillis();
    longwaitTime=now+getRandomizedIdleWaitTime();
    longtimeUntilContinue=waitTime-now;
    synchronized(sigLock){
    try{
    if(!halted.get()){
    //QTZ-336Ajobmighthavebeencompletedinthemeantimeandwemighthave
    //missedthescheduledchangedsignalbynotwaitingforthenotify()yet
    //Checkthatbeforewaitingfortoolongincasethisveryjobneedstobe
    //scheduledverysoon
    if(!isScheduleChanged()){
    sigLock.wait(timeUntilContinue);
    }
    }
    }catch(InterruptedExceptionignore){
    }
    }
    
    }catch(RuntimeExceptionre){
    getLog().error("Runtimeerroroccurredinmaintriggerfiringloop.",re);
    }
    }//while(!halted)
    
    //dropreferencestoschedulerstufftoaidgarbagecollection...
    qs=null;
    qsRsrcs=null;
}
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • 本文是根据 Quartz定时器官方文档翻译的,只翻译了第1到第10课,如有翻译不精确的地方,请读者指正,互相学习,...
    ChinaXieShuai阅读 12,710评论 1 19
  • ** 版本:2.2.1 ** Hello world: 调度器: 任务详情:任务体实现Job接口 触发器: 执行调...
    Coselding阅读 13,454评论 12 38
  • 进程和线程 进程 所有运行中的任务通常对应一个进程,当一个程序进入内存运行时,即变成一个进程.进程是处于运行过程中...
    胜浩_ae28阅读 10,536评论 0 23
  • 有时候需要狠狠摔一跤,你才能知道你的位置。 世态炎凉,你以为你是谁 一只骆驼,辛辛苦苦穿过了沙漠,一只苍蝇趴在骆驼...
    一条狗的流浪记阅读 1,638评论 0 0
  • 何时才能对自己诚实?我等待着。
    娇娇说阅读 975评论 0 0