利用学习的时间这里写了个Spring和Quartz结合的一个web项目,纯后端的项目,restful接口
实现对定时任务的增、删、改、查、停止, 启动、定时规则修改、立即执行等。github地址:holly-quartz-web,这里刚开始是为了学习源码,后来有了一些改动,再后来就想做一些业务上的改造,所以clone了一个quartz-core的项目进行改造,后期打算对其集群方式进行改造等等。github地址:quartz-core,有一起感兴趣的朋友可以一起改造,目前的项目比较简单可以作为学习入门的项目,也可以作为搭建job管理系统的初期项目,慢慢迭代。
在四的时候我们讲了下整体run方法以及集群实现的核心思想,进一步解释这条规则就是:一个调度器实例在执行涉及到分布式问题的数据库操作前,首先要获取QUARTZ2_LOCKS表中对应当前调度器的行级锁,获取锁后即可执行其他表中的数据库操作,随着操作事务的提交,行级锁被释放,供其他调度器实例获取.
集群中的每一个调度器实例都遵循这样一种严格的操作规程,那么对于同一类调度器来说,每个实例对数据库的操作只能是串行的.而不同名的调度器之间却可以并行执行.这节我们看下细节,Quartz的一些设计上的取舍,以及有节点宕机后的job恢复执行(别的服务器节点是怎么接替它的任务的)。
在JobStoreSupport类中有个内部类ClusterManager,ClusterManager也是个Thread,在run方法中
@Override
public void run() {
while (!shutdown) {
if (!shutdown) {
long timeToSleep = getClusterCheckinInterval();
long transpiredTime = (System.currentTimeMillis() - lastCheckin);
timeToSleep = timeToSleep - transpiredTime;
if (timeToSleep <= 0) {
timeToSleep = 100L;
}
if(numFails > 0) {
timeToSleep = Math.max(getDbRetryInterval(), timeToSleep);
}
try {
Thread.sleep(timeToSleep);
} catch (Exception ignore) {
}
}
if (!shutdown && this.manage()) {
signalSchedulingChangeImmediately(0L);
}
}//while !shutdown
}
}
从方法中可以看出 run期间会sleep一会,实际运行的是manage方法
private boolean manage() {
boolean res = false;
try {
res = doCheckin();
numFails = 0;
getLog().debug("ClusterManager: Check-in complete.");
} catch (Exception e) {
if(numFails % 4 == 0) {
getLog().error(
"ClusterManager: Error managing cluster: "
+ e.getMessage(), e);
}
numFails++;
}
return res;
}
实际的方法还是在
protected boolean doCheckin() throws JobPersistenceException {
boolean transOwner = false;
boolean transStateOwner = false;
boolean recovered = false;
Connection conn = getNonManagedTXConnection();
try {
// Other than the first time, always checkin first to make sure there is
// work to be done before we acquire the lock (since that is expensive,
// and is almost never necessary). This must be done in a separate
// transaction to prevent a deadlock under recovery conditions.
List<SchedulerStateRecord> failedRecords = null;
if (!firstCheckIn) {
failedRecords = clusterCheckIn(conn);
commitConnection(conn);
}
if (firstCheckIn || (failedRecords.size() > 0)) {
getLockHandler().obtainLock(conn, LOCK_STATE_ACCESS);
transStateOwner = true;
// Now that we own the lock, make sure we still have work to do.
// The first time through, we also need to make sure we update/create our state record
failedRecords = (firstCheckIn) ? clusterCheckIn(conn) : findFailedInstances(conn);
if (failedRecords.size() > 0) {
getLockHandler().obtainLock(conn, LOCK_TRIGGER_ACCESS);
//getLockHandler().obtainLock(conn, LOCK_JOB_ACCESS);
transOwner = true;
clusterRecover(conn, failedRecords);
recovered = true;
}
}
commitConnection(conn);
} catch (JobPersistenceException e) {
rollbackConnection(conn);
throw e;
} finally {
try {
releaseLock(LOCK_TRIGGER_ACCESS, transOwner);
} finally {
try {
releaseLock(LOCK_STATE_ACCESS, transStateOwner);
} finally {
cleanupConnection(conn);
}
}
}
firstCheckIn = false;
return recovered;
}
先检查数据库中的,再对自身做一下检查。当发现最后有失败的节点的时候会进行恢复。clusterRecover方法就是进行恢复的方法。
protected void clusterRecover(Connection conn, List<SchedulerStateRecord> failedInstances)
throws JobPersistenceException {
if (failedInstances.size() > 0) {
long recoverIds = System.currentTimeMillis();
logWarnIfNonZero(failedInstances.size(),
"ClusterManager: detected " + failedInstances.size()
+ " failed or restarted instances.");
try {
for (SchedulerStateRecord rec : failedInstances) {
getLog().info(
"ClusterManager: Scanning for instance \""
+ rec.getSchedulerInstanceId()
+ "\"'s failed in-progress jobs.");
List<FiredTriggerRecord> firedTriggerRecs = getDelegate()
.selectInstancesFiredTriggerRecords(conn,
rec.getSchedulerInstanceId());
int acquiredCount = 0;
int recoveredCount = 0;
int otherCount = 0;
Set<TriggerKey> triggerKeys = new HashSet<TriggerKey>();
for (FiredTriggerRecord ftRec : firedTriggerRecs) {
TriggerKey tKey = ftRec.getTriggerKey();
JobKey jKey = ftRec.getJobKey();
triggerKeys.add(tKey);
// release blocked triggers..
if (ftRec.getFireInstanceState().equals(STATE_BLOCKED)) {
getDelegate()
.updateTriggerStatesForJobFromOtherState(
conn, jKey,
STATE_WAITING, STATE_BLOCKED);
} else if (ftRec.getFireInstanceState().equals(STATE_PAUSED_BLOCKED)) {
getDelegate()
.updateTriggerStatesForJobFromOtherState(
conn, jKey,
STATE_PAUSED, STATE_PAUSED_BLOCKED);
}
// release acquired triggers..
if (ftRec.getFireInstanceState().equals(STATE_ACQUIRED)) {
getDelegate().updateTriggerStateFromOtherState(
conn, tKey, STATE_WAITING,
STATE_ACQUIRED);
acquiredCount++;
} else if (ftRec.isJobRequestsRecovery()) {
// handle jobs marked for recovery that were not fully
// executed..
if (jobExists(conn, jKey)) {
@SuppressWarnings("deprecation")
SimpleTriggerImpl rcvryTrig = new SimpleTriggerImpl(
"recover_"
+ rec.getSchedulerInstanceId()
+ "_"
+ String.valueOf(recoverIds++),
Scheduler.DEFAULT_RECOVERY_GROUP,
new Date(ftRec.getScheduleTimestamp()));
rcvryTrig.setJobName(jKey.getName());
rcvryTrig.setJobGroup(jKey.getGroup());
rcvryTrig.setMisfireInstruction(SimpleTrigger.MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY);
rcvryTrig.setPriority(ftRec.getPriority());
JobDataMap jd = getDelegate().selectTriggerJobDataMap(conn, tKey.getName(), tKey.getGroup());
jd.put(Scheduler.FAILED_JOB_ORIGINAL_TRIGGER_NAME, tKey.getName());
jd.put(Scheduler.FAILED_JOB_ORIGINAL_TRIGGER_GROUP, tKey.getGroup());
jd.put(Scheduler.FAILED_JOB_ORIGINAL_TRIGGER_FIRETIME_IN_MILLISECONDS, String.valueOf(ftRec.getFireTimestamp()));
jd.put(Scheduler.FAILED_JOB_ORIGINAL_TRIGGER_SCHEDULED_FIRETIME_IN_MILLISECONDS, String.valueOf(ftRec.getScheduleTimestamp()));
rcvryTrig.setJobDataMap(jd);
rcvryTrig.computeFirstFireTime(null);
storeTrigger(conn, rcvryTrig, null, false,
STATE_WAITING, false, true);
recoveredCount++;
} else {
getLog()
.warn(
"ClusterManager: failed job '"
+ jKey
+ "' no longer exists, cannot schedule recovery.");
otherCount++;
}
} else {
otherCount++;
}
// free up stateful job's triggers
if (ftRec.isJobDisallowsConcurrentExecution()) {
getDelegate()
.updateTriggerStatesForJobFromOtherState(
conn, jKey,
STATE_WAITING, STATE_BLOCKED);
getDelegate()
.updateTriggerStatesForJobFromOtherState(
conn, jKey,
STATE_PAUSED, STATE_PAUSED_BLOCKED);
}
}
getDelegate().deleteFiredTriggers(conn,
rec.getSchedulerInstanceId());
// Check if any of the fired triggers we just deleted were the last fired trigger
// records of a COMPLETE trigger.
int completeCount = 0;
for (TriggerKey triggerKey : triggerKeys) {
if (getDelegate().selectTriggerState(conn, triggerKey).
equals(STATE_COMPLETE)) {
List<FiredTriggerRecord> firedTriggers =
getDelegate().selectFiredTriggerRecords(conn, triggerKey.getName(), triggerKey.getGroup());
if (firedTriggers.isEmpty()) {
if (removeTrigger(conn, triggerKey)) {
completeCount++;
}
}
}
}
logWarnIfNonZero(acquiredCount,
"ClusterManager: ......Freed " + acquiredCount
+ " acquired trigger(s).");
logWarnIfNonZero(completeCount,
"ClusterManager: ......Deleted " + completeCount
+ " complete triggers(s).");
logWarnIfNonZero(recoveredCount,
"ClusterManager: ......Scheduled " + recoveredCount
+ " recoverable job(s) for recovery.");
logWarnIfNonZero(otherCount,
"ClusterManager: ......Cleaned-up " + otherCount
+ " other failed job(s).");
if (!rec.getSchedulerInstanceId().equals(getInstanceId())) {
getDelegate().deleteSchedulerState(conn,
rec.getSchedulerInstanceId());
}
}
} catch (Throwable e) {
throw new JobPersistenceException("Failure recovering jobs: "
+ e.getMessage(), e);
}
}
}
期间集群检查及恢复涉及到核心表示QRTZ_SCHEDULER_STATE表。会有检查的时间和间隔。判断是否故障想必大家很明白了。检查时间长时间没有更新。
发现有故障的节点 会接管对应的任务,并将该节点的数据删除以免下次再检测到,然后重复接管。
集群方面基本就是这些内容,细节还得debug代码,好多我自己也不知道。看的出quartz在设计的时候有好多取舍的地方,以数据库为边界的操作,数据库随时有可能发生变化。 如果这时调度器发生了改变,新的trigger添加进来,那么有可能新添加的trigger比当前待执行的trigger更急迫,那么需要放弃当前trigger重新获取,然而,这里存在一个值不值得的问题,如果重新获取新trigger的时间要长于当前时间到新trigger出发的时间,那么即使放弃当前的trigger,仍然会导致xntrigger获取失败,但我们又不知道获取新的trigger需要多长时间,于是,我们做了一个主观的评判,若jobstore为RAM,那么假定获取时间需要7ms,若jobstore是持久化的,假定其需要70ms,当前时间与新trigger的触发时间之差小于这个值的我们认为不值得重新获取。这些都算是取舍吧,还有官方的文档介绍,集群特性对于高cpu使用率的任务效果很好,但是对于大量的短任务,各个节点都会抢占数据库锁,这样就出现大量的线程等待资源.这种情况随着节点的增加会越来越严重. 所以比较适合较长时间执行一次的任务
Only one node will fire the job for each firing. What I mean by that is, if the job has a repeating trigger that tells it to fire every 10 seconds, then at 12:00:00 exactly one node will run the job, and at 12:00:10 exactly one node will run the job, etc. It won't necessarily be the same node each time - it will more or less be random which node runs it. The load balancing mechanism is near-random for busy schedulers (lots of triggers) but favors the same node for non-busy (e.g. few triggers) schedulers.
The clustering feature works best for scaling out long-running and/or cpu-intensive jobs (distributing the work-load over multiple nodes). If you need to scale out to support thousands of short-running (e.g 1 second) jobs, consider partitioning the set of jobs by using multiple distinct schedulers (including multiple clustered schedulers for HA). The scheduler makes use of a cluster-wide lock, a pattern that degrades performance as you add more nodes (when going beyond about three nodes - depending upon your database's capabilities, etc.).
整个分析先到此处,后续计划 会支持注解方式,即在一个类上用注解的方式即可实现任务的添加。集群方式改造,希望引入zookeeper实现。减轻短小任务对数据库的压力以及引入rpc模式实现远程电泳任务等等,后续持续更新。。。