任务调度简介
1、什么时候需要任务调度
业务场景:
1)账单日或者还款日上午 9 点,给每个信用卡客户发送账单通知,还款通知,如何判断客户的账单日、还款日,完成通知的发送?
2)银行业务系统,夜间要完成跑批的一系列流程,清理数据,下载文件,解析文件,对账清算、切换结算日期等等,如何触发一系列流程的执行?
3)金融机构跟人民银行二代支付系统对接,人民银行要求低于 5W 的金额(小额支付)半个小时打一次包发送,以缓解并发压力。所以,银行的跨行转账分成了多个流程:
录入、复核、发送。如何把半个小时以内的所有数据一次性发送?
类似于这种 1、基于准确的时刻或者固定的时间间隔触发的任务,或者 2、有批量数据需要处理,或者 3、要实现两个动作解耦的场景,我们都可以用任务调度来实现
2、任务调度需求分析
基本需求:
1)可以定义触发的规则,比如基于时刻、时间间隔、表达式
2)可以定义需要执行的任务。比如执行一个脚本或者一段代码。任务和规则是分开的
3)集中管理配置,持久配置。不用把规则写在代码里面,可以看到所有的任务配置,方便维护,重启之后任务可以再次调度——配置文件或者配置中心
4)支持任务的串行执行,例如执行 A 任务后再执行 B 任务再执行 C 任务
5)支持多个任务并发执行,互不干扰(例如ScheduledThreadPoolExecutor 线程池)
6)有自己的调度器,可以启动、中断、停止任务
7)容易集成到 Spring
3、任务调度工具对比
| 层次 | 举例 | 特点 |
|---|---|---|
| 操作系统 | Linux Window 计划任务 | 只能执行简单脚本或者命令 |
| 数据库 | MySQL Oracle | 可以操作数据,不能执行java代码 |
| 工具 | kettle | 可以操作数据库,执行脚本,没有集中配置 |
| 开发语言 | JDK Timer、ScheduledThreadPool | Timer:单线程 JDK1.5 之后:ScheduledThreadPool(Cache、Fiexed、Single):没有集中配置,日常管理不够灵活 |
| 容器 | Spring Task、@Scheduled | 不支持集群 |
| 分布式框架 | XXL-JOB、Elastic-Job | 支持集群,集中配置,容易管理 |
@Scheduled 也是用 JUC 的 ScheduledExecutorService 实现的
Scheduled(cron = “0/5 * * * * ?”)
1) ScheduledAnnotationBeanPostProcessor 的 postProcessAfterInitialization 方法将@Scheduled 的方法包装为指定的 task添加到 ScheduledTaskRegistrar 中
2) ScheduledAnnotationBeanPostProcessor 会监听 Spring 的容器初始化事件,在 Spring 容器初始化完成后进行TaskScheduler 实现类实例的查找,若发现有 SchedulingConfigurer 的实现类实例,则跳过 3
3) 查找 TaskScheduler 的实现类实例默认是通过类型查找,若有多个实现则会查找名字为"taskScheduler"的实现 Bean,若没有找到则在 ScheduledTaskRegistrar 调度任务的时候会创建一个newSingleThreadScheduledExecutor,将TaskScheduler 的实现类实例设置到 ScheduledTaskRegistrar 属性中
4)ScheduledTaskRegistrar 的 scheduleTasks 方法触发任务调度
5)真正调度任务的类是 TaskScheduler 实现类中的 ScheduledExecutorService,由 J.U.C 提供
Quartz基本介绍
Quatz 是一个特性丰富的,开源的任务调度库,它几乎可以嵌入所有的 Java 程序,从很小的独立应用程序到大型商业系统。Quartz 可以用来创建成百上千的简单的或者复杂的任务,这些任务可以用来执行任何程序可以做的事情。Quartz 拥有很多企业级的特性,包括支持 JTA 事务和集群
Quartz 是一个老牌的任务调度系统,98 年构思,01 年发布到 sourceforge。现在更新比较慢,因为已经非常成熟了。
Quartz 的目的就是让任务调度更加简单,开发人员只需要关注业务即可。他是用 Java 语言编写的(也有.NET 的版本)。Java 代码能做的任何事情,Quartz 都可以调度。
特点:
a)精确到毫秒级别的调度
b)可以独立运行,也可以集成到容器中
c)支持事务(JobStoreCMT )
d)支持集群
e)支持持久化
Quartz Java编程
1、引入依赖
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
<version>2.3.0</version>
</dependency>
2、默认配置文件
org.quartz.core 包下,有一个默认的配置文件,quartz.properties,当我们没有定义一个同名的配置文件的时候,就会使用默认配置文件里面的配置
org.quartz.scheduler.instanceName: DefaultQuartzScheduler
org.quartz.scheduler.rmi.export: false
org.quartz.scheduler.rmi.proxy: false
org.quartz.scheduler.wrapJobExecutionInUserTransaction: false
org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount: 10
org.quartz.threadPool.threadPriority: 5
org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread: true
org.quartz.jobStore.misfireThreshold: 60000
org.quartz.jobStore.class: org.quartz.simpl.RAMJobStore
3、创建Job
实现唯一的方法 execute(),方法中的代码就是任务执行的内容
public class MyJob implements Job {
public void execute(JobExecutionContext context) throws JobExecutionException {
System.out.println("定时任务测试");
}
}
在测试类方法中,把 Job 进一步包装成 JobDetail,必须要指定 JobName 和 groupName,两个合起来是唯一标识符,可以携带 KV 的数据(JobDataMap),用于扩展属性,在运行的时候可以从 context获取到
JobDetail jobDetail = JobBuilder.newJob(MyJob1.class)
.withIdentity("job1", "group1")
.usingJobData("vincent","666")
.usingJobData("liao",1314)
.build();
4、创建Trigger
在测试类 main()方法中,基于 SimpleTrigger 定义了一个每 2 秒钟运行一次、不断重复的 Trigger:
Trigger trigger = TriggerBuilder.newTrigger()
.withIdentity("trigger1", "group1")
.startNow()
.withSchedule(SimpleScheduleBuilder.simpleSchedule()
.withIntervalInSeconds(2)
.repeatForever())
.build();
5、创建Scheduler
在测试类 main()方法中,通过 Factory 获取调度器的实例,把 JobDetail 和 Trigger绑定,注册到容器中
Scheduler 启动顺序无所谓,只要有 Trigger 到达触发条件,就会执行任务
SchedulerFactory factory = new StdSchedulerFactory();
Scheduler scheduler = factory.getScheduler();
scheduler.scheduleJob(jobDetail, trigger);
scheduler.start();
调度器一定是单例的
6、体系结构总结

1)JobDetail
创建一个实现 Job 接口的类,使用 JobBuilder 包装成 JobDetail,它可以携带KV 的数据
2)Trigger
定义任务的触发规律:Trigger,使用 TriggerBuilder 来构建
JobDetail 跟 Trigger 是 1:N 的关系
为什么要解耦?
Trigger 接口在 Quartz 有 4 个继承的子接口:
| 子接口 | 描述 | 特点 |
|---|---|---|
| SimpleTrigger | 简单触发器 | 固定时刻或时间间隔,单位是毫秒 |
| CalendarIntervalTrigger | 基于日历的触发器 | 比简单触发器更多时间单位,支持非固定时间的触发,例如一年可能 365/366,一个月可能 28/29/30/31 |
| DailyTimeIntervalTrigger | 基于日期的触发器 | 每天的某个时间段 |
| CronTrigger | 基于Cron表达式的触发器 |
SimpleTrigger
SimpleTrigger 可以定义固定时刻或者固定时间间隔的调度规则(精确到毫秒)
例如:每天 9 点钟运行;每隔 30 分钟运行一次
CalendarIntervalTrigger
可以定义更多时间单位的调度需求,精确到秒
好处是:不需要去计算时间间隔,比如 1 个小时等于多少毫秒
例如每年、每个月、每周、每天、每小时、每分钟、每秒、每年的月数和每个月的天数不是固定的,这种情况也适用
DailyTimeIntervalTrigger
每天的某个时间段内,以一定的时间间隔执行任务
例如:每天早上 10 点到晚上 7 点,每隔半个小时执行一次,并且只在周一到周六执行
CronTrigger
可以定义基于 Cron 表达式的调度规则,是最常用的触发器类型
Cron表达式
| 位置 | 时间域 | 特殊值 | |
|---|---|---|---|
| 1 | 秒 | 0-59 | , - * / |
| 2 | 分钟 | 0-59 | , - * / |
| 3 | 小时 | 0-23 | , - * / |
| 4 | 日期 | 1-31 | , - * ? / L W C |
| 5 | 月份 | 1-12 | , - * / |
| 6 | 星期 | 1-7 | , - * ? / L W C |
| 7 | 年份 | 1-31 | , - * / |
解析:
星号(*):可用在所有字段中,表示对应时间域的每一个时刻,例如,在分钟字段时,表示“每分钟”;
问号(?):该字符只在日期和星期字段中使用,它通常指定为“无意义的值”,相当于点位符;
减号(-):表达一个范围,如在小时字段中使用“10-12”,则表示从 10 到 12 点,即 10,11,12;
逗号(,):表达一个列表值,如在星期字段中使用“MON,WED,FRI”,则表示星期一,星期三和星期五;
斜杠(/):x/y 表达一个等步长序列,x 为起始值,y 为增量步长值。如在分钟字段中使用 0/15,则表示为 0,15,30 和45 秒,而 5/15 在分钟字段中表示 5,20,35,50,你也可以使用*/y,它等同于 0/y;
L:该字符只在日期和星期字段中使用,代表“Last”的意思,但它在两个字段中意思不同。L 在日期字段中,表示这个月份的最后一天,如一月的 31 号,非闰年二月的 28 号;如果 L 用在星期中,则表示星期六,等同于 7。但是,如果 L 出现在星期字段里,而且在前面有一个数值 X,则表示“这个月的最后 X 天”,例如,6L 表示该月的最后星期五;
W:该字符只能出现在日期字段里,是对前导日期的修饰,表示离该日期最近的工作日。例如 15W 表示离该月 15号最近的工作日,如果该月 15 号是星期六,则匹配 14 号星期五;如果 15 日是星期日,则匹配 16 号星期一;如果 15号是星期二,那结果就是 15 号星期二。但必须注意关联的匹配日期不能够跨月,如你指定 1W,如果 1 号是星期六,结果匹配的是 3 号星期一,而非上个月最后的那天。W 字符串只能指定单一日期,而不能指定日期范围;
LW 组合:在日期字段可以组合使用 LW,它的意思是当月的最后一个工作日;`井号(#)`:该字符只能在星期字段中使用,表示当月某个工作日。如 `6#3` 表示当月的第三个星期五(6 表示星期五,`#3`表示当前的第三个),而 `4#5` 表示当月的第五个星期三,假设当月没有第五个星期三,忽略不触发;
C:该字符只在日期和星期字段中使用,代表“Calendar”的意思。它的意思是计划所关联的日期,如果日期没有被关联,则相当于日历中所有日期。例如 5C 在日期字段中就相当于日历 5 日以后的第一天。1C 在星期字段中相当于星期日后的第一天。Cron 表达式对特殊字符的大小写不敏感,对代表星期的缩写英文大小写也不敏感
3)Scheduler
调度器,是 Quartz 的指挥官,由 StdSchedulerFactory 产生。它是单例的
Quartz 中最重要的 API,默认是实现类是 StdScheduler,里面包含了一个QuartzScheduler,QuartzScheduler 里面又包含了一个 QuartzSchedulerThread
Scheduler 中的方法主要分为三大类:
1)操作调度器本身,例如调度器的启动 start()、调度器的关闭 shutdown()
2)操作 Trigger,例如 pauseTriggers()、resumeTrigger()
3)操作 Job,例如 scheduleJob()、unscheduleJob()、rescheduleJob()
4)Listener
需求:在每个任务运行结束之后发送通知给运维管理员,那是不是
要在每个任务的最后添加一行代码呢?这种方式对原来的代码造成了入侵,不利于维护,如果代码不是写在任务代码的最后一行,怎么知道任务执行完了呢?或者说,怎么监测到任务的生命周期呢?
观察者模式:定义对象间一种一对多的依赖关系,使得每当一个对象改变状态,则所有依赖它的对象都会得到通知并自动更新
Quartz 中提供了三种 Listener:
监听 Scheduler 的,监听 Trigger 的,监听 Job 的
只需要创建类实现相应的接口,并在 Scheduler 上注册 Listener,便可实现对核心对象的监听
JobListener
public class MyJobListener implements JobListener {
//返回JobListener名称
public String getName() {
String name = getClass().getSimpleName();
System.out.println( "Method 111111 :"+ "获取到监听器名称:"+name);
return name;
}
//Scheduler在JobDetail将要被执行时调用这个方法
public void jobToBeExecuted(JobExecutionContext context) {
String jobName = context.getJobDetail().getKey().getName();
System.out.println("Method 222222 :"+ jobName + " ——任务即将执行 ");
}
//Scheduler在JobDetail即将被执行,但又被TriggerListener否决了时调用这个方法
public void jobExecutionVetoed(JobExecutionContext context) {
String jobName = context.getJobDetail().getKey().getName();
System.out.println("Method 333333 :"+ jobName + " ——任务被否决 ");
}
//Scheduler在JobDetail被执行之后调用这个方法
public void jobWasExecuted(JobExecutionContext context, JobExecutionException jobException) {
String jobName = context.getJobDetail().getKey().getName();
System.out.println("Method 444444 :"+ jobName + " ——执行完毕 ");
System.out.println("------------------");
}
}
public class MyJobListenerTest {
public static void main(String[] args) throws SchedulerException {
// JobDetail
JobDetail jobDetail = JobBuilder.newJob(MyJob1.class).withIdentity("job1", "group1").build();
// Trigger
Trigger trigger = TriggerBuilder.newTrigger().withIdentity("trigger1", "group1").startNow()
.withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(5).repeatForever()).build();
// SchedulerFactory
SchedulerFactory factory = new StdSchedulerFactory();
// Scheduler
Scheduler scheduler = factory.getScheduler();
scheduler.scheduleJob(jobDetail, trigger);
// 创建并注册一个全局的Job Listener
scheduler.getListenerManager().addJobListener(new MyJobListener(), EverythingMatcher.allJobs());
scheduler.start();
}
}
工具类:ListenerManager,用于添加、获取、移除监听器
工具类:Matcher,主要是基于 groupName 和 keyName 进行匹配。
TriggerListener
public class MyTriggerListener implements TriggerListener {
private String name;
public MyTriggerListener(String name) {
this.name = name;
}
//返回监听器的名称
public String getName() {
return name;
}
// Trigger 被触发,Job 上的 execute() 方法将要被执行时,Scheduler就调用这个方法
public void triggerFired(Trigger trigger, JobExecutionContext context) {
String triggerName = trigger.getKey().getName();
System.out.println("Method 11111 " + triggerName + " was fired");
}
// 在 Trigger 触发后,Job 将要被执行时由 Scheduler 调用这个方法
// TriggerListener 给了一个选择去否决 Job 的执行,如果返回true时,这个任务不会被触发
public boolean vetoJobExecution(Trigger trigger, JobExecutionContext context) {
String triggerName = trigger.getKey().getName();
System.out.println("Method 222222 " + triggerName + " was not vetoed");
return false;
}
//Trigger 错过触发时调用
public void triggerMisfired(Trigger trigger) {
String triggerName = trigger.getKey().getName();
System.out.println("Method 333333 " + triggerName + " misfired");
}
//Trigger 被触发并且完成了 Job 的执行时,Scheduler 调用这个方法
public void triggerComplete(Trigger trigger, JobExecutionContext context,
Trigger.CompletedExecutionInstruction triggerInstructionCode) {
String triggerName = trigger.getKey().getName();
System.out.println("Method 444444 " + triggerName + " is complete");
System.out.println("------------");
}
}
public class MyTriggerListenerTest {
public static void main(String[] args) throws SchedulerException {
// JobDetail
JobDetail jobDetail = JobBuilder.newJob(MyJob1.class).withIdentity("job1", "group1").build();
// Trigger
Trigger trigger = TriggerBuilder.newTrigger().withIdentity("trigger1", "group1").startNow()
.withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(10).repeatForever()).build();
// SchedulerFactory
SchedulerFactory factory = new StdSchedulerFactory();
// Scheduler
Scheduler scheduler = factory.getScheduler();
scheduler.scheduleJob(jobDetail, trigger);
// 创建并注册一个全局的Trigger Listener
scheduler.getListenerManager().addTriggerListener(new MyTriggerListener("myListener1"), EverythingMatcher.allTriggers());
// 创建并注册一个局部的Trigger Listener
scheduler.getListenerManager().addTriggerListener(new MyTriggerListener("myListener2"), KeyMatcher.keyEquals(TriggerKey.triggerKey("trigger1", "gourp1")));
// 创建并注册一个特定组的Trigger Listener
GroupMatcher<TriggerKey> matcher = GroupMatcher.triggerGroupEquals("gourp1");
scheduler.getListenerManager().addTriggerListener(new MyTriggerListener("myListener3"), matcher);
scheduler.start();
}
}
SchedulerListener
public class MySchedulerListener implements SchedulerListener {
public void jobScheduled(Trigger trigger) {
String jobName = trigger.getJobKey().getName();
System.out.println( jobName + " has been scheduled");
}
public void jobUnscheduled(TriggerKey triggerKey) {
System.out.println(triggerKey + " is being unscheduled");
}
public void triggerFinalized(Trigger trigger) {
System.out.println("Trigger is finished for " + trigger.getJobKey().getName());
}
public void triggerPaused(TriggerKey triggerKey) {
System.out.println(triggerKey + " is being paused");
}
public void triggersPaused(String triggerGroup) {
System.out.println("trigger group "+triggerGroup + " is being paused");
}
public void triggerResumed(TriggerKey triggerKey) {
System.out.println(triggerKey + " is being resumed");
}
public void triggersResumed(String triggerGroup) {
System.out.println("trigger group "+triggerGroup + " is being resumed");
}
public void jobAdded(JobDetail jobDetail) {
System.out.println(jobDetail.getKey()+" is added");
}
public void jobDeleted(JobKey jobKey) {
System.out.println(jobKey+" is deleted");
}
public void jobPaused(JobKey jobKey) {
System.out.println(jobKey+" is paused");
}
public void jobsPaused(String jobGroup) {
System.out.println("job group "+jobGroup+" is paused");
}
public void jobResumed(JobKey jobKey) {
System.out.println(jobKey+" is resumed");
}
public void jobsResumed(String jobGroup) {
System.out.println("job group "+jobGroup+" is resumed");
}
public void schedulerError(String msg, SchedulerException cause) {
System.out.println(msg + cause.getUnderlyingException().getStackTrace());
}
public void schedulerInStandbyMode() {
System.out.println("scheduler is in standby mode");
}
public void schedulerStarted() {
System.out.println("scheduler has been started");
}
public void schedulerStarting() {
System.out.println("scheduler is being started");
}
public void schedulerShutdown() {
System.out.println("scheduler has been shutdown");
}
public void schedulerShuttingdown() {
System.out.println("scheduler is being shutdown");
}
public void schedulingDataCleared() {
System.out.println("scheduler has cleared all data");
}
}
public class MySchedulerListenerTest {
public static void main(String[] args) throws SchedulerException {
// JobDetail
JobDetail jobDetail = JobBuilder.newJob(MyJob1.class).withIdentity("job1", "group1").build();
// Trigger
Trigger trigger = TriggerBuilder.newTrigger().withIdentity("trigger1", "group1").startNow()
.withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(5).repeatForever()).build();
// SchedulerFactory
SchedulerFactory factory = new StdSchedulerFactory();
// Scheduler
Scheduler scheduler = factory.getScheduler();
scheduler.scheduleJob(jobDetail, trigger);
// 创建Scheduler Listener
scheduler.getListenerManager().addSchedulerListener(new MySchedulerListener());
scheduler.start();
}
}
5)JobStore
最多可以运行多少个任务(磁盘、内存、线程数)?
Jobstore 用来存储任务和触发器相关的信息,例如所有任务的名称、数量、状态等等
Quartz 中有两种存储任务的方式:一种存在内存,一种是存在数据库
RAMJobStore
Quartz 默认的 JobStore 是 RAMJobstore,也就是把任务和触发器信息运行的信息存储在内存中,用到了 HashMap、TreeSet、HashSet 等等数据结构
如果程序崩溃或重启,所有存储在内存中的数据都会丢失,所以我们需要把这些数据持久化到磁盘
JDBCJobStore
JDBCJobStore 可以通过 JDBC 接口,将任务运行数据保存在数据库中
JDBC 的实现方式有两种,JobStoreSupport 类的两个子类:
JobStoreTX:在独立的程序中使用,自己管理事务,不参与外部事务
JobStoreCMT:(Container Managed Transactions (CMT),如果需要容器管理事务时,可以使用他
使用JDBCJobStore时,需要配置数据的信息:
org.quartz.jobStore.class:org.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.jobStore.driverDelegateClass:org.quartz.impl.jdbcjobstore.StdJDBCDelegate
# 使用 quartz.properties,不使用默认配置
org.quartz.jobStore.useProperties:true
#数据库中 quartz 表的表名前缀
org.quartz.jobStore.tablePrefix:QRTZ_
org.quartz.jobStore.dataSource:myDS
#配置数据源
org.quartz.dataSource.myDS.driver:com.mysql.jdbc.Driver
org.quartz.dataSource.myDS.URL:jdbc:mysql://localhost:3306/vincent?useUnicode=true&characterEncoding=utf8
org.quartz.dataSource.myDS.user:root
org.quartz.dataSource.myDS.password:123456
org.quartz.dataSource.myDS.validationQuery=select 0 from dual
问题来了?需要建什么表?表里面有什么字段?字段类型和长度是什么?
在官网的 Downloads 链接中,提供了 11 张表的建表语句
2.3 的版本在这个路径下:src\org\quartz\impl\jdbcjobstore
表名和作用:
QRTZ_BLOB_TRIGGERS:Trigger 作为 Blob 类型存储
QRTZ_CALENDARS:存储 Quartz 的 Calendar 信息
QRTZ_CRON_TRIGGERS:存储 CronTrigger,包括 Cron 表达式和时区信息
QRTZ_FIRED_TRIGGERS:存储与已触发的 Trigger 相关的状态信息,以及相关 Job 的执行信息
QRTZ_JOB_DETAILS:存储每一个已配置的 Job 的详细信息
QRTZ_LOCKS:存储程序的悲观锁的信息
QRTZ_PAUSED_TRIGGER_GRPS:存储已暂停的 Trigger 组的信息
QRTZ_SCHEDULER_STATE:存储少量的有关 Scheduler 的状态信息,和别的 Scheduler 实例
QRTZ_SIMPLE_TRIGGERS:存储 SimpleTrigger 的信息,包括重复次数、间隔、以及已触的次数
QRTZ_SIMPROP_TRIGGERS:存储 CalendarIntervalTrigger 和 DailyTimeIntervalTrigger 两种类型的触发器
QRTZ_TRIGGERS:存储已配置的 Trigger 的信息
Quartz集成到Spring
Spring 在 spring-context-support.jar 中直接提供了对Quartz 的支持

可以在配置文件中把 JobDetail、Trigger、Scheduler 定义成 Bean,交给Spring去管理
1)定义Job
<bean name="myJob1" class="org.springframework.scheduling.quartz.JobDetailFactoryBean">
<property name="name" value="my_job_1"/>
<property name="vincent" value="my_group"/>
<property name="jobClass" value="com.vincent.quartz.MyJob1"/>
<property name="durability" value="true"/>
</bean>
2)定义Trigger
<bean name="simpleTrigger" class="org.springframework.scheduling.quartz.SimpleTriggerFactoryBean">
<property name="name" value="my_trigger_1"/>
<property name="group" value="my_group"/>
<property name="jobDetail" ref="myJob1"/>
<property name="startDelay" value="1000"/>
<property name="repeatInterval" value="5000"/>
<property name="repeatCount" value="2"/>
</bean>
3)定义Scheduler
<bean name="scheduler" class="org.springframework.scheduling.quartz.SchedulerFactoryBean">
<property name="triggers">
<list>
<ref bean="simpleTrigger"/>
<ref bean="cronTrigger"/>
</list>
</property>
</bean>
既然可以在配置文件配置,当然也可以用@Bean 注解配置。在配置类上加上@Configuration 让 Spring 读取到
public class QuartzConfig {
@Bean
public JobDetail printTimeJobDetail(){
return JobBuilder.newJob(MyJob1.class)
.withIdentity("vincentJob")
.usingJobData("vincent", "只为更好的你")
.storeDurably()
.build();
}
@Bean
public Trigger printTimeJobTrigger() {
CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule("0/10 * * * * ?");
return TriggerBuilder.newTrigger()
.forJob(printTimeJobDetail())
.withIdentity("quartzTaskService")
.withSchedule(cronScheduleBuilder)
.build();
}
}
public class QuartzTest {
private static Scheduler scheduler;
public static void main(String[] args) throws SchedulerException {
// 获取容器
ApplicationContext ac = new ClassPathXmlApplicationContext("spring_quartz.xml");
// 从容器中获取调度器
scheduler = (StdScheduler) ac.getBean("scheduler");
// 启动调度器
scheduler.start();
}
}
动态调度的实现
1)配置管理
用最简单的数据库的实现
问题 1:建一张什么样的表?参考 JobDetail 的属性
CREATE TABLE `sys_job` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'ID',
`job_name` varchar(512) NOT NULL COMMENT '任务名称',
`job_group` varchar(512) NOT NULL COMMENT '任务组名',
`job_cron` varchar(512) NOT NULL COMMENT '时间表达式',
`job_class_path` varchar(1024) NOT NULL COMMENT '类路径,全类型',
`job_data_map` varchar(1024) DEFAULT NULL COMMENT '传递 map 参数',
`job_status` int(2) NOT NULL COMMENT '状态:1 启用 0 停用',
`job_describe` varchar(1024) DEFAULT NULL COMMENT '任务功能描述',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=25 DEFAULT CHARSET=utf8;
2)数据操作与任务调度
操作数据表非常简单,SSM 增删改查
但是在修改了表的数据之后,怎么让调度器知道呢?
调度器的接口:Scheduler
在我们的需求中,我们需要做的事情:
1、 新增一个任务
2、 删除一个任务
3、 启动、停止一个任务
4、 修改任务的信息(包括调度规律)
因 此 可 以 把 相 关 的 操 作 封 装 到 一 个 工 具 类 中:
public class SchedulerUtil {
private static Logger logger = LoggerFactory.getLogger(SchedulerUtil.class);
/**
* 新增定时任务
* @param jobClassName 类路径
* @param jobName 任务名称
* @param jobGroupName 组别
* @param cronExpression Cron表达式
* @param jobDataMap 需要传递的参数
* @throws Exception
*/
public static void addJob(String jobClassName,String jobName, String jobGroupName, String cronExpression,String jobDataMap) throws Exception {
// 通过SchedulerFactory获取一个调度器实例
SchedulerFactory sf = new StdSchedulerFactory();
Scheduler scheduler = sf.getScheduler();
// 启动调度器
scheduler.start();
// 构建job信息
JobDetail jobDetail = JobBuilder.newJob(getClass(jobClassName).getClass())
.withIdentity(jobName, jobGroupName).build();
// JobDataMap用于传递任务运行时的参数,比如定时发送邮件,可以用json形式存储收件人等等信息
if (StringUtils.isNotEmpty(jobDataMap)) {
JSONObject jb = JSONObject.parseObject(jobDataMap);
Map<String, Object> dataMap =(Map<String, Object>) jb.get("data");
for (Map.Entry<String, Object> m:dataMap.entrySet()) {
jobDetail.getJobDataMap().put(m.getKey(),m.getValue());
}
}
// 表达式调度构建器(即任务执行的时间)
CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(cronExpression);
// 按新的cronExpression表达式构建一个新的trigger
CronTrigger trigger = TriggerBuilder.newTrigger().withIdentity(jobName, jobGroupName)
.withSchedule(scheduleBuilder).startNow().build();
try {
scheduler.scheduleJob(jobDetail, trigger);
} catch (SchedulerException e) {
logger.info("创建定时任务失败" + e);
throw new Exception("创建定时任务失败");
}
}
/**
* 停用一个定时任务
* @param jobName 任务名称
* @param jobGroupName 组别
* @throws Exception
*/
public static void jobPause(String jobName, String jobGroupName) throws Exception {
// 通过SchedulerFactory获取一个调度器实例
SchedulerFactory sf = new StdSchedulerFactory();
Scheduler scheduler = sf.getScheduler();
scheduler.pauseJob(JobKey.jobKey(jobName, jobGroupName));
}
/**
* 启用一个定时任务
* @param jobName 任务名称
* @param jobGroupName 组别
* @throws Exception
*/
public static void jobresume(String jobName, String jobGroupName) throws Exception {
// 通过SchedulerFactory获取一个调度器实例
SchedulerFactory sf = new StdSchedulerFactory();
Scheduler scheduler = sf.getScheduler();
scheduler.resumeJob(JobKey.jobKey(jobName, jobGroupName));
}
/**
* 删除一个定时任务
* @param jobName 任务名称
* @param jobGroupName 组别
* @throws Exception
*/
public static void jobdelete(String jobName, String jobGroupName) throws Exception {
// 通过SchedulerFactory获取一个调度器实例
SchedulerFactory sf = new StdSchedulerFactory();
Scheduler scheduler = sf.getScheduler();
scheduler.pauseTrigger(TriggerKey.triggerKey(jobName, jobGroupName));
scheduler.unscheduleJob(TriggerKey.triggerKey(jobName, jobGroupName));
scheduler.deleteJob(JobKey.jobKey(jobName, jobGroupName));
}
/**
* 更新定时任务表达式
* @param jobName 任务名称
* @param jobGroupName 组别
* @param cronExpression Cron表达式
* @throws Exception
*/
public static void jobReschedule(String jobName, String jobGroupName, String cronExpression) throws Exception {
try {
SchedulerFactory schedulerFactory = new StdSchedulerFactory();
Scheduler scheduler = schedulerFactory.getScheduler();
TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroupName);
// 表达式调度构建器
CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(cronExpression);
CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
// 按新的cronExpression表达式重新构建trigger
trigger = trigger.getTriggerBuilder().withIdentity(triggerKey).withSchedule(scheduleBuilder).startNow().build();
// 按新的trigger重新设置job执行
scheduler.rescheduleJob(triggerKey, trigger);
} catch (SchedulerException e) {
System.out.println("更新定时任务失败" + e);
throw new Exception("更新定时任务失败");
}
}
/**
* 检查Job是否存在
* @throws Exception
*/
public static Boolean isResume(String jobName, String jobGroupName) throws Exception {
SchedulerFactory sf = new StdSchedulerFactory();
Scheduler scheduler = sf.getScheduler();
TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroupName);
Boolean state = scheduler.checkExists(triggerKey);
return state;
}
/**
* 暂停所有任务
* @throws Exception
*/
public static void pauseAlljob() throws Exception {
SchedulerFactory sf = new StdSchedulerFactory();
Scheduler scheduler = sf.getScheduler();
scheduler.pauseAll();
}
/**
* 唤醒所有任务
* @throws Exception
*/
public static void resumeAlljob() throws Exception {
SchedulerFactory sf = new StdSchedulerFactory();
Scheduler sched = sf.getScheduler();
sched.resumeAll();
}
/**
* 获取Job实例
* @param classname
* @return
* @throws Exception
*/
public static BaseJob getClass(String classname) throws Exception {
try {
Class<?> c = Class.forName(classname);
return (BaseJob) c.newInstance();
} catch (Exception e) {
throw new Exception("类["+classname+"]不存在!");
}
}
}
3)容器启动与Service注入
a)容器启动
任务没有定义在 ApplicationContext.xml 中,而是放到了数据库中,SpringBoot 启动时,怎么读取任务信息?怎么在 Spring 启动完成的时候做一些事情?
创建一个类,实现 CommandLineRunner 接口,实现 run方法
从表中查出状态是 1 的任务,然后构建
@Component
public class InitStartSchedule implements CommandLineRunner {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Autowired
private ISysJobService sysJobService;
@Autowired
private MyJobFactory myJobFactory;
@Override
public void run(String... args) throws Exception {
/**
* 用于程序启动时加载定时任务,并执行已启动的定时任务(只会执行一次,在程序启动完执行)
*/
//查询job状态为启用的
HashMap<String,String> map = new HashMap<String,String>();
map.put("jobStatus", "1");
List<SysJob> jobList= sysJobService.querySysJobList(map);
if( null == jobList || jobList.size() ==0){
logger.info("系统启动,没有需要执行的任务... ...");
}
// 通过SchedulerFactory获取一个调度器实例
SchedulerFactory sf = new StdSchedulerFactory();
Scheduler scheduler = sf.getScheduler();
// 如果不设置JobFactory,Service注入到Job会报空指针
scheduler.setJobFactory(myJobFactory);
// 启动调度器
scheduler.start();
for (SysJob sysJob:jobList) {
String jobClassName=sysJob.getJobName();
String jobGroupName=sysJob.getJobGroup();
//构建job信息
JobDetail jobDetail = JobBuilder.newJob(getClass(sysJob.getJobClassPath()).getClass()).withIdentity(jobClassName, jobGroupName).build();
if (StringUtils.isNotEmpty(sysJob.getJobDataMap())) {
JSONObject jb = JSONObject.parseObject(sysJob.getJobDataMap());
Map<String, Object> dataMap = (Map<String, Object>)jb.get("data");
for (Map.Entry<String, Object> m:dataMap.entrySet()) {
jobDetail.getJobDataMap().put(m.getKey(),m.getValue());
}
}
//表达式调度构建器(即任务执行的时间)
CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(sysJob.getJobCron());
//按新的cronExpression表达式构建一个新的trigger
CronTrigger trigger = TriggerBuilder.newTrigger().withIdentity(jobClassName, jobGroupName)
.withSchedule(scheduleBuilder).startNow().build();
// 任务不存在的时候才添加
if( !scheduler.checkExists(jobDetail.getKey()) ){
try {
scheduler.scheduleJob(jobDetail, trigger);
} catch (SchedulerException e) {
logger.info("\n创建定时任务失败"+e);
throw new Exception("创建定时任务失败");
}
}
}
}
public static BaseJob getClass(String classname) throws Exception
{
Class<?> c= Class.forName(classname);
return (BaseJob)c.newInstance();
}
}
b)Service类注入到Job中
Spring Bean 如何注入到实现了 Job 接口的类中?
如果没有任何配置,注入会报空指针异常
因为定时任务Job 对象的实例化过程是在 Quartz 中进行的,而 Service Bean 是由Spring 容器管理的,Quartz 察觉不到 Service Bean 的存在,所以无法将 Service Bean装配到 Job 对象中
分析:
Quartz 集成到 Spring 中,用到 SchedulerFactoryBean,其实现了 InitializingBean方法,在唯一的方法 afterPropertiesSet()在 Bean 的属性初始化后调用
调度器用 AdaptableJobFactory 对 Job 对象进行实例化,如果我们可以把这个 JobFactory 指定为我们自定义的工厂的话,就可以在 Job 实例化完成之后,把 Job纳入到 Spring 容器中管理
解决:
1)定义一个 AdaptableJobFactory,实现 JobFactory 接口,实现接口定义的newJob 方法,在这里面返回 Job 实例
public class AdaptableJobFactory implements JobFactory {
@Override
public Job newJob(TriggerFiredBundle bundle, Scheduler arg1) throws SchedulerException {
return newJob(bundle);
}
public Job newJob(TriggerFiredBundle bundle) throws SchedulerException {
try {
// 返回Job实例
Object jobObject = createJobInstance(bundle);
return adaptJob(jobObject);
}
catch (Exception ex) {
throw new SchedulerException("Job instantiation failed", ex);
}
}
// 通过反射的方式创建实例
protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {
Method getJobDetail = bundle.getClass().getMethod("getJobDetail");
Object jobDetail = ReflectionUtils.invokeMethod(getJobDetail, bundle);
Method getJobClass = jobDetail.getClass().getMethod("getJobClass");
Class jobClass = (Class) ReflectionUtils.invokeMethod(getJobClass, jobDetail);
return jobClass.newInstance();
}
protected Job adaptJob(Object jobObject) throws Exception {
if (jobObject instanceof Job) {
return (Job) jobObject;
}
else if (jobObject instanceof Runnable) {
return new DelegatingJob((Runnable) jobObject);
}
else {
throw new IllegalArgumentException("Unable to execute job class [" + jobObject.getClass().getName() +
"]: only [org.quartz.Job] and [java.lang.Runnable] supported.");
}
}
}
2)定义一个MyJobFactory,继承AdaptableJobFactory
@Component
public class MyJobFactory extends AdaptableJobFactory {
@Autowired
private AutowireCapableBeanFactory capableBeanFactory;
protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {
//调用父类的方法
Object jobInstance = super.createJobInstance(bundle);
capableBeanFactory.autowireBean(jobInstance);
return jobInstance;
}
}
3)指定Scheduler的JobFactory为自定义的JobFactory
scheduler.setJobFactory(myJobFactory);
Quartz集群部署
1)为什么要集群
a)防止单点故障,减少对业务的影响
b)减少节点的压力,例如在 11 点要触发 5000 个任务,如果有 10 个节点,则每个节点之需要执行 500 个任务
2)集群需要解决的问题
a)任务重跑,因为节点部署的内容是一样的,到 10 点的时候,每个节点都会执行相同的操作,引起数据混乱,比如跑批,绝对不能执行多次
b)任务漏跑,假如任务是平均分配的,本来应该在某个节点上执行的任务,因为节点故障,一直没有得到执行
c)水平集群需要注意时间同步问题
d)Quartz 使用的是随机的负载均衡算法,不能指定节点执行
所以必须要有一种共享数据或者通信的机制,在分布式系统的不同节点中,我们可以采用什么样的方式,实现数据共享?
两两通信,或者基于分布式的服务,实现数据共享
例如:ZK、Redis、DB
在 Quartz 中,提供了一种简单的方式,基于数据库共享任务执行信息。也就是说,一个节点执行任务的时候,会操作数据库,其他的节点查询数据库,便可以感知到了
3)集群配置与验证
quartz.properties 配置。
四个配置:集群实例 ID、集群开关、数据库持久化、数据源信息
注意先清空 quartz 所有表、改端口、两个任务频率改成一样
验证 1:先后启动 2 个节点,任务是否重跑
验证 2:停掉一个节点,任务是否漏跑
Quartz调度原理
带着问题看源码:
Job 没有继承 Thread 和实现 Runnable,是怎么被调用的?通过反射还是什么?
任务是什么时候被调度的?是谁在监视任务还是监视 Trigger?
任务是怎么被调用的?谁执行了任务?
任务本身有状态吗?还是触发器有状态?
源码入口:
Scheduler scheduler = factory.getScheduler();
scheduler.scheduleJob(jobDetail, trigger);
scheduler.start();
1)获取调度器实例
a、读取配置文件
public Scheduler getScheduler() throws SchedulerException {
if (cfg == null) {
// 读取 quartz.properties 配置文件
initialize();
}
// 这个类是一个 HashMap,用来基于调度器的名称保证调度器的唯一性
SchedulerRepository schedRep = SchedulerRepository.getInstance();
Scheduler sched = schedRep.lookup(getSchedulerName());
// 如果调度器已经存在了
if (sched != null) {
// 调度器关闭了,移除
if (sched.isShutdown()) {
schedRep.remove(getSchedulerName());
} else {
// 返回调度器
return sched;
}
}
// 调度器不存在,初始化
sched = instantiate();
return sched;
}
instantiate()方法中做了初始化的所有工作:
// 存储任务信息的 JobStore
JobStore js = null;
// 创建线程池,默认是 SimpleThreadPool
ThreadPool tp = null;
// 创建调度器
QuartzScheduler qs = null;
// 连接数据库的连接管理器
DBConnectionManager dbMgr = null;
// 自动生成 ID
// 创建线程执行器,默认为 DefaultThreadExecutor
ThreadExecutor threadExecutor;
b、创建线程池(包工头)
创建了一个线程池,默认是配置文件中指定的SimpleThreadPool
String tpClass = cfg.getStringProperty(PROP_THREAD_POOL_CLASS, SimpleThreadPool.class.getName());
tp = (ThreadPool) loadHelper.loadClass(tpClass).newInstance();
SimpleThreadPool 里面维护了三个 list,分别存放所有的工作线程、空闲的工作线程和忙碌的工作线程,我们可以把 SimpleThreadPool 理解为包工头
private List<WorkerThread> workers;
private LinkedList<WorkerThread> availWorkers = new LinkedList<WorkerThread>();
private LinkedList<WorkerThread> busyWorkers = new LinkedList<WorkerThread>();
tp 的 runInThread()方法是线程池运行线程的接口方法。参数 Runnable 是执行的任务内容,取出 WorkerThread 去执行参数里面的 runnable(JobRunShell)
WorkerThread wt = (WorkerThread)availWorkers.removeFirst();
busyWorkers.add(wt);
wt.run(runnable);
c、WorkerThread(工人)
WorkerThread 是 SimpleThreadPool的内部类 , 用来执行任务 ,我 们 把WorkerThread理解为工人。在WorkerThread的run方法中,执行传入的参数runnable任务:runnable.run();
d、创建调度线程(项目经理)
创建了调度器 QuartzScheduler:
qs = new QuartzScheduler(rsrcs, idleWaitTime, dbFailureRetry);
在 QuartzScheduler 的构造函数中,创建了QuartzSchedulerThread,我们把它理解为项目经理,它会调用包工头的工人资源,给他们安排任务
并且创建了线程执行器 schedThreadExecutor , 执 行 了 这 个QuartzSchedulerThread,也就是调用了它的 run 方法
// 创建一个线程,resouces 里面有线程名称
this.schedThread = new QuartzSchedulerThread(this, resources);
// 线程执行器
ThreadExecutor schedThreadExecutor = resources.getThreadExecutor();
//执行这个线程,也就是调用了线程的 run 方法
schedThreadExecutor.execute(this.schedThread);
在QuartzSchedulerThread 类,找到 run 方法,这个是 Quartz 任务调度的核心方法:
public void run() {
boolean lastAcquireFailed = false;
// 检查 scheuler 是否为停止状态
while (!halted.get()) {
try {
// check if we're supposed to pause...
synchronized (sigLock) {
// 检查是否为暂停状态
while (paused && !halted.get()) {
try {
// wait until togglePause(false) is called...
// 暂停的话会尝试去获得信号锁,并 wait 一会
sigLock.wait(1000L);
} catch (InterruptedException ignore) {
}
}
if (halted.get()) {
break;
}
}
//从线程池获取可用的线程
int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
if(availThreadCount > 0) { // will always be true, due to semantics of blockForAvailableThreads...
List<OperableTrigger> triggers = null;
long now = System.currentTimeMillis();
clearSignaledSchedulingChange();
try {
// 获取需要下次执行的 triggers
// idleWaitTime: 默认 30s
// availThreadCount:获取可用(空闲)的工作线程数量,总会大于 1,因为该方法会一直阻塞,直到有工作线程空闲下来。
//maxBatchSize:一次拉取 trigger 的最大数量,默认是 1
//batchTimeWindow:时间窗口调节参数,默认是 0
//misfireThreshold: 超过这个时间还未触发的 trigger,被认为发生了 misfire,默认 60s
//调度线程一次会拉取 NEXT_FIRETIME 小于(now + idleWaitTime +batchTimeWindow),大于(now - misfireThreshold)的,min(availThreadCount,maxBatchSize)个 triggers,默认情况下,会拉取未来 30s、过去 60s 之间还未 fire 的 1 个 trigger
triggers = qsRsrcs.getJobStore().acquireNextTriggers(
now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
lastAcquireFailed = false;
if (log.isDebugEnabled())
log.debug("batch acquisition of " + (triggers == null ? 0 : triggers.size()) + " triggers");
} catch (JobPersistenceException jpe) {
if(!lastAcquireFailed) {
qs.notifySchedulerListenersError(
"An error occurred while scanning for the next triggers to fire.",
jpe);
}
lastAcquireFailed = true;
continue;
} catch (RuntimeException e) {
if(!lastAcquireFailed) {
getLog().error("quartzSchedulerThreadLoop: RuntimeException "
+e.getMessage(), e);
}
lastAcquireFailed = true;
continue;
}
if (triggers != null && !triggers.isEmpty()) {
now = System.currentTimeMillis();
long triggerTime = triggers.get(0).getNextFireTime().getTime();
long timeUntilTrigger = triggerTime - now;
while(timeUntilTrigger > 2) {
synchronized (sigLock) {
if (halted.get()) {
break;
}
if (!isCandidateNewTimeEarlierWithinReason(triggerTime, false)) {
try {
// we could have blocked a long while
// on 'synchronize', so we must recompute
now = System.currentTimeMillis();
timeUntilTrigger = triggerTime - now;
if(timeUntilTrigger >= 1)
sigLock.wait(timeUntilTrigger);
} catch (InterruptedException ignore) {
}
}
}
if(releaseIfScheduleChangedSignificantly(triggers, triggerTime)) {
break;
}
now = System.currentTimeMillis();
timeUntilTrigger = triggerTime - now;
}
// this happens if releaseIfScheduleChangedSignificantly decided to release triggers
if(triggers.isEmpty())
continue;
// set triggers to 'executing'
List<TriggerFiredResult> bndles = new ArrayList<TriggerFiredResult>();
boolean goAhead = true;
synchronized(sigLock) {
goAhead = !halted.get();
}
if(goAhead) {
try {
//触发 Trigger,把 ACQUIRED 状态改成 EXECUTING
//如果这个 trigger 的 NEXTFIRETIME 为空,也就是未来不再触发,就将其状态改为COMPLETE
//如果trigger不允许并发执行(即Job的实现类标注了@DisallowConcurrentExecution),则将状态变为 BLOCKED,否则就将状态改为 WAITING
List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);
if(res != null)
bndles = res;
} catch (SchedulerException se) {
qs.notifySchedulerListenersError(
"An error occurred while firing triggers '"
+ triggers + "'", se);
//QTZ-179 : a problem occurred interacting with the triggers from the db
//we release them and loop again
//循环处理 Trigger
for (int i = 0; i < triggers.size(); i++) {
qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
}
continue;
}
}
for (int i = 0; i < bndles.size(); i++) {
TriggerFiredResult result = bndles.get(i);
TriggerFiredBundle bndle = result.getTriggerFiredBundle();
Exception exception = result.getException();
if (exception instanceof RuntimeException) {
getLog().error("RuntimeException while firing trigger " + triggers.get(i), exception);
qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
continue;
}
// it's possible to get 'null' if the triggers was paused,
// blocked, or other similar occurrences that prevent it being
// fired at this time... or if the scheduler was shutdown (halted)
if (bndle == null) {
qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
continue;
}
JobRunShell shell = null;
try {
//根据 trigger 信息实例化JobRunShell(implements Runnable),同时依据JOB_CLASS_NAME 实例化 Job,随后我们将 JobRunShell 实例丢入工作线。
shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
shell.initialize(qs);
} catch (SchedulerException se) {
qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
continue;
}
// 执行 JobRunShell 的 run 方法
if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
// this case should never happen, as it is indicative of the
// scheduler being shutdown or a bug in the thread pool or
// a thread pool being used concurrently - which the docs
// say not to do...
getLog().error("ThreadPool.runInThread() return false!");
qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
}
}
continue; // while (!halted)
}
} else { // if(availThreadCount > 0)
// should never happen, if threadPool.blockForAvailableThreads() follows contract
continue; // while (!halted)
}
long now = System.currentTimeMillis();
long waitTime = now + getRandomizedIdleWaitTime();
long timeUntilContinue = waitTime - now;
synchronized(sigLock) {
try {
if(!halted.get()) {
// QTZ-336 A job might have been completed in the mean time and we might have
// missed the scheduled changed signal by not waiting for the notify() yet
// Check that before waiting for too long in case this very job needs to be
// scheduled very soon
if (!isScheduleChanged()) {
sigLock.wait(timeUntilContinue);
}
}
} catch (InterruptedException ignore) {
}
}
} catch(RuntimeException re) {
getLog().error("Runtime error occurred in main trigger firing loop.", re);
}
} // while (!halted)
// drop references to scheduler stuff to aid garbage collection...
qs = null;
qsRsrcs = null;
}
JobRunShell 的作用:
JobRunShell 用来为 Job 提供安全的运行环境的,执行 Job 中所有的作业,捕获运行中的异常,在任务执行完毕的时候更新 Trigger 状态等等
JobRunShell 实例是用 JobRunShellFactory 为 QuartzSchedulerThread 创建的,在调度器决定一个 Job 被触发的时候,它从线程池中取出一个线程来执行任务
e、线程模型总结
SimpleThreadPool:包工头,管理所有 WorkerThread
WorkerThread:工人,把 Job 包装成 JobRunShell,执行
QuartSchedulerThread:项目经理,获取即将触发的 Trigger,从包工头拿出拿到worker,执行 Trigger 绑定的任务
2)绑定JobDetail和Trigger
// 存储 JobDetail 和 Trigger
resources.getJobStore().storeJobAndTrigger(jobDetail, trig);
// 通知相关的 Listener
notifySchedulerListenersJobAdded(jobDetail);
notifySchedulerThread(trigger.getNextFireTime().getTime());
notifySchedulerListenersSchduled(trigger);
3)启动调度器
// 通知监听器
notifySchedulerListenersStarting();
if (initialStart == null) {
initialStart = new Date();
this.resources.getJobStore().schedulerStarted();
startPlugins();
} else {
resources.getJobStore().schedulerResumed();
}
// 通知 QuartzSchedulerThread 不再等待,开始干活
schedThread.togglePause(false);
// 通知监听器
notifySchedulerListenersStarted();
4)源码总结
getScheduler 方法创建线程池 ThreadPool,创建调度器 QuartzScheduler,创建调度线程 QuartzSchedulerThread,调度线程初始处于暂停状态
scheduleJob 将任务添加到 JobStore 中
scheduler.start()方法激活调度器,QuartzSchedulerThread 从 timeTrriger 取出待触发的任务,并包装成 TriggerFiredBundle,然后由 JobRunShellFactory 创建TriggerFiredBundle 的 执 行 线 程 JobRunShell , 调 度 执 行 通 过 线 程 池SimpleThreadPool去执行JobRunShell,而JobRunShell执行的就是任务类的execute方法:job.execute(JobExecutionContext context)
5)集群原理
基于数据库,如何实现任务的不重跑不漏跑?
问题 1:如果任务执行中的资源是“下一个即将触发的任务”,怎么基于数据库实现这个资源的竞争?
问题 2:怎么对数据的行加锁?

QuartzSchedulerThread 获取下一个即将触发的 Trigger:
triggers = qsRsrcs.getJobStore().acquireNextTriggers()
调用 JobStoreSupport 的 acquireNextTriggers()方法
调用 JobStoreSupport.executeInNonManagedTXLock()方法
return executeInNonManagedTXLock(lockName,
尝试获得锁:
transOwner = getLockHandler().obtainLock(conn, lockName);
调用 DBSemaphore 的 obtainLock()方法:
public boolean obtainLock(Connection conn, String lockName)
throws LockException {
if (!isLockOwner(lockName)) {
executeSQL(conn, lockName, expandedSQL, expandedInsertSQL);
调用 StdRowLockSemaphore 的 executeSQL()方法
最终用 JDBC 执行 SQL,语句内容是 expandedSQL 和 expandedInsertSQL
ps = conn.prepareStatement(expandedSQL);
问题:expandedSQL 和 expandedInsertSQL 是一条什么 SQL 语句?似乎我们没有赋值?
在 StdRowLockSemaphore 的构造函数中,把定义的两条 SQL 传进去:
public StdRowLockSemaphore() {
super(DEFAULT_TABLE_PREFIX, null, SELECT_FOR_LOCK, INSERT_LOCK);
}
public static final String SELECT_FOR_LOCK = "SELECT * FROM "+ TABLE_PREFIX_SUBST + TABLE_LOCKS + " WHERE " + COL_SCHEDULER_NAME + " = " + SCHED_NAME_SUBST
+ " AND " + COL_LOCK_NAME + " = ? FOR UPDATE";
public static final String INSERT_LOCK = "INSERT INTO "
+ TABLE_PREFIX_SUBST + TABLE_LOCKS + "(" + COL_SCHEDULER_NAME + ", " + COL_LOCK_NAME + ") VALUES (" + SCHED_NAME_SUBST + ", ?)";
它调用了父类 DBSemaphore 的构造函数:
public DBSemaphore(String tablePrefix, String schedName, String defaultSQL, String defaultInsertSQL) {
this.tablePrefix = tablePrefix;
this.schedName = schedName;
setSQL(defaultSQL);
setInsertSQL(defaultInsertSQL);
}
在 setSQL()和 setInsertSQL()中为 expandedSQL 和expandedInsertSQL 赋值
执行的 SQL 语句:
select * from QRTZ_LOCKS t where t.lock_name='TRIGGER_ACCESS' for update
在执行官方的建表脚本的时候,QRTZ_LOCKS 表,它会为每个调度器创建两行数据,获取 Trigger 和触发 Trigger 是两把锁:


6)任务为什么重复执行
有多个调度器,任务没有重复执行,也就是默认会加锁,什么情况下不会上锁呢?
JobStoreSupport 的 executeInNonManagedTXLock()方法,如果 lockName 为空,则不上锁
if (lockName != null) {
// If we aren't using db locks, then delay getting DB connection
// until after acquiring the lock since it isn't needed.
if (getLockHandler().requiresConnection()) {
conn = getNonManagedTXConnection();
}
transOwner = getLockHandler().obtainLock(conn, lockName);
}
if (conn == null) {
conn = getNonManagedTXConnection();
}
而 上 一 步 JobStoreSupport 的 acquireNextTriggers() 方 法 , 如 果isAcquireTriggersWithinLock()值是 false 并且 maxCount>1 的话,lockName 赋值为null,否则赋值为 LOCK_TRIGGER_ACCESS,这种情况获取 Trigger 下默认不加锁
public List<OperableTrigger> acquireNextTriggers(final long noLaterThan, final int maxCount, final long timeWindow)
throws JobPersistenceException {
String lockName;
if(isAcquireTriggersWithinLock() || maxCount > 1) {
lockName = LOCK_TRIGGER_ACCESS;
} else {
lockName = null;
}
}
acquireTriggersWithinLock 默认是空的:
private boolean acquireTriggersWithinLock = false;
maxCount 来自 QuartzSchedulerThread:
triggers = qsRsrcs.getJobStore().acquireNextTriggers(
now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()),qsRsrcs.getBatchTimeWindow());
getMaxBatchSize()来自 QuartzSchedulerResources,代表 Scheduler 一次拉取trigger 的最大数量,默认是 1:
private int maxBatchSize = 1;
这个值可以通过参数修改:
org.quartz.scheduler.batchTriggerAcquisitionMaxCount=50
理论上把 batchTriggerAcquisitionMaxCount 的值改掉以后,在获取 Trigger 的时候就不会再上锁了,但是实际上为什么没有出现频繁的重复执行问题?
因为每个调度器的线程持有锁的时间太短了
QuartzSchedulerThread 的 triggersFired()方法:
List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);
调用了 JobStoreSupport 的 triggersFired()方法,接着又调用了一个 triggerFired,triggerFired(Connection conn, OperableTrigger trigger)方法:
如果 Trigger 的状态不是 ACQUIRED,也就是说被其他的线程 fire 了,返回空。但是这种乐观锁的检查在高并发下难免会出现 ABA 的问题,比如线程 A 拿到的时候还是ACQUIRED 状态,但是刚准备执行的时候已经变成了 EXECUTING 状态,这个时候就会出现重复执行的问题
if (!state.equals(STATE_ACQUIRED)) {
return null;
}
总结:
如果设置的数量>1,并且使用 JDBC JobStore(RAMJobStore 不支持分布式,只有 一 个 调 度 器 实 例 , 所 以 不 加 锁 ) , 则 属 性org.quartz.jobStore.acquireTriggersWithinLock 应设置为 true,否则不加锁会导致任务重复执行
org.quartz.scheduler.batchTriggerAcquisitionMaxCount=1
org.quartz.jobStore.acquireTriggersWithinLock=true
Quartz-Misfire
什么情况下错过触发?
示例:线程池只有 5 个线程,当有 5 个任务都在执行的时候,第六个任务即将触发,这个时候任务就不能得到执行,在 quartz.properties 有一个属性 misfireThreshold,用来定义触发器超时的"临界值",也就是超过了这个时间,就算错过触发了
例如,如果 misfireThreshold 是 60000(60 秒),9 点整应该执行的任务,9 点零1 分还没有可用线程执行它,就会超时(misfires)
可能造成 misfired job的原因:
1、 没有可用线程
2、 Trigger 被暂停
3、 系统重启
4、 禁止并发执行的任务在到达触发时间时,上次执行还没有结束
错过触发怎么办?
Misfire 策略设置,每一种 Trigger 都定义了自己的 Misfire 策略,不同的策略通过不同的方法来设置
Trigger trigger = TriggerBuilder.newTrigger().withIdentity("trigger1", "group1").startNow()
.withSchedule(SimpleScheduleBuilder.simpleSchedule()
.withMisfireHandlingInstructionNowWithExistingCount()
.withIntervalInSeconds(1)
.repeatForever()).build();
一般来说有 3 种:
1、 忽略
2、 立即跑一次
3、 下次跑
文章参考:
Quartz Scheduler misfireThreshold属性的意义与触发器超时后的处理策略
怎么避免任务错过触发?
合理地设置线程池数量,以及任务触发间隔