上篇介绍了使用 MySql + Springboot 部署 Quartz 集群模式,这篇文章试图帮助你理解集群模式的原理,以更好的使用和维护 Quartz 集群。
2.1 quartz 调度原理
主要组件
- Scheduler 任务调度控制器 (StdScheduler)
- 管理 Trigger 和 Job
- Trigger 任务调度单元
- CronTrigger 可以通过 Cron 表达式规定任务触发规则
- SimpleTrigger 规定任务执行几次,每次的时间间隔,类似 SchedulerExecutor
- Job 调度任务,用于定义你的业务任务具体执行过程
- 一个 Job 可以对应多个 Trigger,一个 Trigger 只能对应一个 Job
线程
Scheduler 主要包含两种线程:
- 调度线程,负责任务调度 (QuartzSchedulerThread)
- 工作线程池,负责执行任务 (QuartzSchedulerResources)
调度线程主要代码如下:
public void run() {
...
while (!halted.get()) {
...
int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
if(availThreadCount > 0) { // will always be true, due to semantics of blockForAvailableThreads...
long now = System.currentTimeMillis();
...
// 获取下一次 触发的任务
triggers = qsRsrcs.getJobStore().acquireNextTriggers(
now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
...
if (triggers != null && !triggers.isEmpty()) {
...
now = System.currentTimeMillis();
long triggerTime = triggers.get(0).getNextFireTime().getTime();
long timeUntilTrigger = triggerTime - now;
// 等待触发时间
while(timeUntilTrigger > 2) {
...
timeUntilTrigger = triggerTime - now;
}
...
List<TriggerFiredResult> bndles = qsRsrcs.getJobStore().triggersFired(triggers);
...
for (int i = 0; i < bndles.size(); i++) {
TriggerFiredResult result = bndles.get(i);
TriggerFiredBundle bndle = result.getTriggerFiredBundle();
...
// 创建 Job 运行线程,并放入工作线程池运行
JobRunShell shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
...
qsRsrcs.getThreadPool().runInThread(shell);
}
}
}
}
}
2.2 集群模式原理
集群模式和单机模式的工作原理类似,但是需要解决以下几个问题:
- Job 信息的持久化
- 如何感知集群中的其他节点状态,以在节点发现 failover时将任务恢复
- 保证任务执行的一致性,不会出现同一个任务被多个节点抢到,重复执行的现象
持久化
JobStore 用于存储 Job 和 Trigger 信息
- RAMJobStore 存在内存中,无法持久化,且难以在集群间进行通信
- JobStoreSupport, 通过 DriverDelegate(StdJDBCDelegate) 持久化
数据库表
QRTZ_CALENDARS | Quartz的Calendar信息 |
---|---|
QRTZ_CRON_TRIGGERS | CronTrigger,包括Cron表达式和时区信息 |
QRTZ_FIRED_TRIGGERS | 与已触发的Trigger相关的状态信息,以及相联Job的执行信息 |
QRTZ_PAUSED_TRIGGER_GRPS | 已暂停的Trigger组的信息 |
QRTZ_SCHEDULER_STATE | 存储Scheduler的状态信息,和别的Scheduler实例 |
QRTZ_LOCKS | 锁信息 |
QRTZ_JOB_DETAILS | 存储每一个已配置的Job的详细信息 |
QRTZ_SIMPLE_TRIGGERS | SimpleTrigger |
QRTZ_BLOB_TRIGGERS | Trigger作为Blob类型存储 |
QRTZ_SIMPROP_TRIGGERS | Simprop Trigger |
QRTZ_TRIGGERS | 已配置的Trigger的信息 |
集群调度
相关类
JobStoreSupport 数据库存储任务信息实现
StdRowLockSemaphore 数据库行锁实现
获取失效实例
相关表信息
QRTZ_SCHEDULER_STATE
+-------------------+--------------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+-------------------+--------------+------+-----+---------+-------+
| SCHED_NAME | varchar(120) | NO | PRI | NULL | |
| INSTANCE_NAME | varchar(200) | NO | PRI | NULL | |
| LAST_CHECKIN_TIME | bigint(13) | NO | | NULL | |
| CHECKIN_INTERVAL | bigint(13) | NO | | NULL | |
+-------------------+--------------+------+-----+---------+-------+
所有的实例会会定时保持心跳,并更新 LAST_CHECKIN_TIME 时间。
protected List<SchedulerStateRecord> findFailedInstances(Connection conn)
throws JobPersistenceException {
List<SchedulerStateRecord> states = getDelegate().selectSchedulerStateRecords(conn, null);
for(SchedulerStateRecord rec: states) {
...
if (rec.getSchedulerInstanceId().equals(getInstanceId())) {
foundThisScheduler = true;
if (firstCheckIn) {
failedInstances.add(rec);
}
} else {
// 实例的刷新时间过期,认为实例已失效
if (calcFailedIfAfter(rec) < timeNow) {
failedInstances.add(rec);
}
}
...
}
return failedInstances;
}
处理失败实例
protected void clusterRecover(Connection conn, List<SchedulerStateRecord> failedInstances)
throws JobPersistenceException {
if (failedInstances.size() > 0) {
...
// 恢复 Trigger 状态
getDelegate().updateTriggerStateFromOtherState(
conn, tKey, STATE_WAITING,
STATE_ACQUIRED);
...
// 删除 FiredTrigger
getDelegate().deleteFiredTriggers(conn,
rec.getSchedulerInstanceId());
// 删除失败实例 Scheduler State
if (!rec.getSchedulerInstanceId().equals(getInstanceId())) {
getDelegate().deleteSchedulerState(conn,
rec.getSchedulerInstanceId());
}
}
}
Scheduler 获取 Trigger 过程
protected <T> T executeInNonManagedTXLock(
String lockName,
TransactionCallback<T> txCallback, final TransactionValidator<T> txValidator) throws JobPersistenceException {
...
// 先获得锁
transOwner = getLockHandler().obtainLock(conn, lockName);
...
// 执行 sql 语句获取 trigger 任务,并写入任务信息
final T result = txCallback.execute(conn);
...
// 释放锁
releaseLock(lockName, transOwner);
}
protected List<OperableTrigger> acquireNextTrigger(Connection conn, long noLaterThan, int maxCount, long timeWindow)
throws JobPersistenceException {
List<TriggerKey> keys = getDelegate().selectTriggerToAcquire(conn, noLaterThan + timeWindow, getMisfireTime(), maxCount);
for(TriggerKey triggerKey: keys) {
JobDetail job = getDelegate().selectJobDetail(conn, jobKey, getClassLoadHelper());
int rowsUpdated = getDelegate().updateTriggerStateFromOtherState(conn, triggerKey, STATE_ACQUIRED, STATE_WAITING);
getDelegate().insertFiredTrigger(conn, nextTrigger, STATE_ACQUIRED, null);
acquiredTriggers.add(nextTrigger);
}
return acquiredTriggers;
}
数据库行锁实现
相关表格
QRTZ_LOCKS
+------------+--------------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+------------+--------------+------+-----+---------+-------+
| SCHED_NAME | varchar(120) | NO | PRI | NULL | |
| LOCK_NAME | varchar(40) | NO | PRI | NULL | |
+------------+--------------+------+-----+---------+-------+
public interface Semaphore {
boolean obtainLock(Connection conn, String lockName) throws LockException;
void releaseLock(String lockName) throws LockException;
boolean requiresConnection();
}
public static final String SELECT_FOR_LOCK = "SELECT * FROM "
+ TABLE_PREFIX_SUBST + TABLE_LOCKS + " WHERE " + COL_SCHEDULER_NAME + " = " + SCHED_NAME_SUBST
+ " AND " + COL_LOCK_NAME + " = ? FOR UPDATE";
public static final String INSERT_LOCK = "INSERT INTO "
+ TABLE_PREFIX_SUBST + TABLE_LOCKS + "(" + COL_SCHEDULER_NAME + ", " + COL_LOCK_NAME + ") VALUES ("
+ SCHED_NAME_SUBST + ", ?)";
select for update 语句会阻塞其他同样针对这一行的 select 语句,直到该 session commit 或 rollback
2.3 其他
处理misfire 任务
- MISFIRE_INSTRUCTION_FIRE_ONCE_NOW 立即执行一次
- MISFIRE_INSTRUCTION_DO_NOTHING 不执行,等待下次触发
Job 类型
- 有状态 Job 不可同时执行,用PersistJobDataAfterExecution 和 DisallowConcurrentExecution 注解
- 无状态 Job 可以同时执行