本文要点
- 1.Quartz相关重要概念
- 2.如何实现分布式调度
- 3.核心源码的实现
1.Quartz相关重要概念
1.1 scheduler:: 任务调度器
如图所示:
Scheduler 有三种:RemoteMBeanScheduler, RemoteScheduler 和 StdScheduler。
- RemoteMBeanScheduler:
- RemoteScheduler:
- StdScheduler:
1.3 job:: 任务,即被调度的任务
主要有两种类型的 job:无状态的(stateless)和有状态的(stateful)。
对于同一个 trigger 来说,有状态(添加@DisallowConcurrentExecution注解
)的 job 不能被并行执行,只有上一次触发的任务被执行完之后,才能触发下一次执行。
一个 job 可以被多个 trigger 关联,但是一个 trigger 只能关联一个 job。
1.4 misfire::错过的,指本来应该被执行但实际没有被执行的任务调度
系统因为某些原因被重启。在系统关闭到重新启动之间的一段时间里,可能有些任务会被 misfire;
Trigger 被暂停(suspend)的一段时间里,有些任务可能会被 misfire;
线程池中所有线程都被占用,导致任务无法被触发执行,造成 misfire;
有状态任务在下次触发时间到达时,上次执行还没有结束;
2.Quartz如何实现分布式调度
3.核心源码的实现
Quartz是基于DB锁来实现的分布式调度。
首先要明确Trigger有几个状态: WAITING(等待)、ACQUIRED(获取)、EXECUTING(执行)、COMPLETE(完成)、BLOCKED(阻塞)、ERROR(报错)、PAUSED(暂停)、PAUSED_BLOCKED(暂停阻塞)、DELETED(删除)
。
具体看下org.quartz.core.QuartzSchedulerThread
核心代码的具体实现
public void run() {
boolean lastAcquireFailed = false;
while (!halted.get()) {
try {
// check if we're supposed to pause...
synchronized (sigLock) {
while (paused && !halted.get()) {
try {
// wait until togglePause(false) is called...
sigLock.wait(1000L);
} catch (InterruptedException ignore) {
}
}
if (halted.get()) {
break;
}
}
int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
if(availThreadCount > 0) { // will always be true, due to semantics of blockForAvailableThreads...
List<OperableTrigger> triggers = null;
long now = System.currentTimeMillis();
clearSignaledSchedulingChange();
try {
/**
* 默认没加锁
*
* 1.查询 TRIGGERS 状态是WAITING 的trigger
*
* 2.更新 TRIGGERS WAITING---->ACQUIRED
*
* 3.插入FIRED_TRIGGERS表
*/
triggers = qsRsrcs.getJobStore().acquireNextTriggers(
now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
lastAcquireFailed = false;
if (log.isDebugEnabled())
log.debug("batch acquisition of " + (triggers == null ? 0 : triggers.size()) + " triggers");
} catch (JobPersistenceException jpe) {
if(!lastAcquireFailed) {
qs.notifySchedulerListenersError(
"An error occurred while scanning for the next triggers to fire.",
jpe);
}
lastAcquireFailed = true;
continue;
} catch (RuntimeException e) {
if(!lastAcquireFailed) {
getLog().error("quartzSchedulerThreadLoop: RuntimeException "
+e.getMessage(), e);
}
lastAcquireFailed = true;
continue;
}
if (triggers != null && !triggers.isEmpty()) {
now = System.currentTimeMillis();
long triggerTime = triggers.get(0).getNextFireTime().getTime();//第一个最早开始
long timeUntilTrigger = triggerTime - now;
while(timeUntilTrigger > 2) {
synchronized (sigLock) {
if (halted.get()) {
break;
}
if (!isCandidateNewTimeEarlierWithinReason(triggerTime, false)) {
try {
// we could have blocked a long while
// on 'synchronize', so we must recompute
now = System.currentTimeMillis();
timeUntilTrigger = triggerTime - now;
if(timeUntilTrigger >= 1)
sigLock.wait(timeUntilTrigger);
} catch (InterruptedException ignore) {
}
}
}
if(releaseIfScheduleChangedSignificantly(triggers, triggerTime)) {
break;
}
now = System.currentTimeMillis();
timeUntilTrigger = triggerTime - now;
}
// this happens if releaseIfScheduleChangedSignificantly decided to release triggers
if(triggers.isEmpty())
continue;
// set triggers to 'executing'
List<TriggerFiredResult> bndles = new ArrayList<TriggerFiredResult>();
boolean goAhead = true;
synchronized(sigLock) {
goAhead = !halted.get();
}
if(goAhead) {
try {
/**
* 获取TRIGGER_ACCESS 锁
* 4.查询TRIGGER 状态校验状态是否是 STATE_ACQUIRED,不是的话就过滤掉,是的话获取job
* 5.获取并更新TRIGGER状态为STATE_EXECUTING,报错的话更新为ERROR
*/
List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);
if(res != null)
bndles = res;
} catch (SchedulerException se) {
qs.notifySchedulerListenersError(
"An error occurred while firing triggers '"
+ triggers + "'", se);
//QTZ-179 : a problem occurred interacting with the triggers from the db
//we release them and loop again
for (int i = 0; i < triggers.size(); i++) {
qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
}
continue;
}
}
for (int i = 0; i < bndles.size(); i++) {
TriggerFiredResult result = bndles.get(i);
TriggerFiredBundle bndle = result.getTriggerFiredBundle();
Exception exception = result.getException();
if (exception instanceof RuntimeException) {
getLog().error("RuntimeException while firing trigger " + triggers.get(i), exception);
qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
continue;
}
// it's possible to get 'null' if the triggers was paused,
// blocked, or other similar occurrences that prevent it being
// fired at this time... or if the scheduler was shutdown (halted)
if (bndle == null) {
qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
continue;
}
JobRunShell shell = null;
try {
shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
shell.initialize(qs);
} catch (SchedulerException se) {
qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
continue;
}
//执行shell
if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
// this case should never happen, as it is indicative of the
// scheduler being shutdown or a bug in the thread pool or
// a thread pool being used concurrently - which the docs
// say not to do...
getLog().error("ThreadPool.runInThread() return false!");
qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
}
}
continue; // while (!halted)
}
} else { // if(availThreadCount > 0)
// should never happen, if threadPool.blockForAvailableThreads() follows contract
continue; // while (!halted)
}
long now = System.currentTimeMillis();
long waitTime = now + getRandomizedIdleWaitTime();
long timeUntilContinue = waitTime - now;
synchronized(sigLock) {
try {
if(!halted.get()) {
// QTZ-336 A job might have been completed in the mean time and we might have
// missed the scheduled changed signal by not waiting for the notify() yet
// Check that before waiting for too long in case this very job needs to be
// scheduled very soon
if (!isScheduleChanged()) {
sigLock.wait(timeUntilContinue);
}
}
} catch (InterruptedException ignore) {
}
}
} catch(RuntimeException re) {
getLog().error("Runtime error occurred in main trigger firing loop.", re);
}
} // while (!halted)
// drop references to scheduler stuff to aid garbage collection...
qs = null;
qsRsrcs = null;
}