今天看到一个quartz 类的定义,一下子引起了我兴趣。
public class FlowSetup extends QuartzJobBean {
public final Map<StackEnum, TaskFlow> flowMapping = new HashMap<>();
@Autowired
private FlowManager flowManager;
...
}
上面这个类没有定义任何annotation, 为什么可以@Autowire ? 如果能autowire 说明它是spring 管理的bean,但是没有被扫描到怎么成为bean 的呢?更进一步的问题就是quartz 是如何跟Spring boot 集成的。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-quartz</artifactId>
</dependency>
利用starter的原理在
spring.factories里有 org.springframework.boot.autoconfigure.quartz.QuartzAutoConfiguration
则启动的时候就会加载QuartzAutoConfiguration,下面看下QuartzAutoConfiguration
@Configuration
@ConditionalOnClass({ Scheduler.class, SchedulerFactoryBean.class,
PlatformTransactionManager.class })
//加入QuartzProperties.class
@EnableConfigurationProperties(QuartzProperties.class)
@AutoConfigureAfter({ DataSourceAutoConfiguration.class,
HibernateJpaAutoConfiguration.class })
public class QuartzAutoConfiguration {
private final QuartzProperties properties;
//构造器注入QuartzProperties
public QuartzAutoConfiguration(QuartzProperties properties,
ObjectProvider<SchedulerFactoryBeanCustomizer> customizers,
ObjectProvider<JobDetail[]> jobDetails,
ObjectProvider<Map<String, Calendar>> calendars,
ObjectProvider<Trigger[]> triggers, ApplicationContext applicationContext) {
this.properties = properties;
this.customizers = customizers;
this.jobDetails = jobDetails.getIfAvailable();
this.calendars = calendars.getIfAvailable();
this.triggers = triggers.getIfAvailable();
this.applicationContext = applicationContext;
}
//定义一个SchedulerFactoryBean,前提是没有SchedulerFactoryBean
@Bean
@ConditionalOnMissingBean
public SchedulerFactoryBean quartzScheduler() {
//new 了一个SchedulerFactoryBean
SchedulerFactoryBean schedulerFactoryBean = new SchedulerFactoryBean();
SpringBeanJobFactory jobFactory = new SpringBeanJobFactory();
jobFactory.setApplicationContext(this.applicationContext);
schedulerFactoryBean.setJobFactory(jobFactory);
//这里可以定义很多quartz.properteis里的属性
if (this.properties.getSchedulerName() != null) {
schedulerFactoryBean.setSchedulerName(this.properties.getSchedulerName());
}
schedulerFactoryBean.setAutoStartup(this.properties.isAutoStartup());
schedulerFactoryBean
.setStartupDelay((int) this.properties.getStartupDelay().getSeconds());
schedulerFactoryBean.setWaitForJobsToCompleteOnShutdown(
this.properties.isWaitForJobsToCompleteOnShutdown());
schedulerFactoryBean
.setOverwriteExistingJobs(this.properties.isOverwriteExistingJobs());
if (!this.properties.getProperties().isEmpty()) {
schedulerFactoryBean
.setQuartzProperties(asProperties(this.properties.getProperties()));
}
if (this.jobDetails != null && this.jobDetails.length > 0) {
schedulerFactoryBean.setJobDetails(this.jobDetails);
}
if (this.calendars != null && !this.calendars.isEmpty()) {
schedulerFactoryBean.setCalendars(this.calendars);
}
if (this.triggers != null && this.triggers.length > 0) {
schedulerFactoryBean.setTriggers(this.triggers);
}
customize(schedulerFactoryBean);
return schedulerFactoryBean;
}
}
首先定义了一个SchedulerFactoryBean, 可以看到是
implements FactoryBean,也就是这是Scheduler 这个定义的生成者
public class SchedulerFactoryBean extends SchedulerAccessor implements FactoryBean<Scheduler>,
BeanNameAware, ApplicationContextAware, InitializingBean, DisposableBean, SmartLifecycle {
//getObject() 是FactoryBean<Scheduler>的接口方法
public Scheduler getObject() {
return this.scheduler;
}
}
FactoryBean和BeanFactory的区别,其实2者除了名字颠倒外,没有必然的关系。BeanFactory是个工厂类,顾名思义就是生产Bean的工厂。而FactoryBean是个生成BeanDefinition的类。
同时在QuartzAutoConfiguration中 通过inner Class SchedulerDependsOnBeanFactoryPostProcessor 进行绑定
/**
* {@link AbstractDependsOnBeanFactoryPostProcessor} for Quartz {@link Scheduler} and
* {@link SchedulerFactoryBean}.
*/
private static class SchedulerDependsOnBeanFactoryPostProcessor extends AbstractDependsOnBeanFactoryPostProcessor {
SchedulerDependsOnBeanFactoryPostProcessor(Class<?>... dependencyTypes) {
super(Scheduler.class, SchedulerFactoryBean.class, dependencyTypes);
}
}
SchedulerDependsOnBeanFactoryPostProcessor在 QuartzSchedulerDependencyConfiguration中 进行了bean 注册。
@Configuration(proxyBeanMethods = false)
static class QuartzSchedulerDependencyConfiguration {
@Bean
static SchedulerDependsOnBeanFactoryPostProcessor quartzSchedulerDataSourceInitializerDependsOnBeanFactoryPostProcessor() {
return new SchedulerDependsOnBeanFactoryPostProcessor(QuartzDataSourceInitializer.class);
}
@Bean
@ConditionalOnBean(FlywayMigrationInitializer.class)
static SchedulerDependsOnBeanFactoryPostProcessor quartzSchedulerFlywayDependsOnBeanFactoryPostProcessor() {
return new SchedulerDependsOnBeanFactoryPostProcessor(FlywayMigrationInitializer.class);
}
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(SpringLiquibase.class)
static class LiquibaseQuartzSchedulerDependencyConfiguration {
@Bean
@ConditionalOnBean(SpringLiquibase.class)
static SchedulerDependsOnBeanFactoryPostProcessor quartzSchedulerLiquibaseDependsOnBeanFactoryPostProcessor() {
return new SchedulerDependsOnBeanFactoryPostProcessor(SpringLiquibase.class);
}
}
}
}
在QuartzAutoConfiguration 还配置了SchedulerFactoryBeanCustomizer, 为schedulerFactoryBean配置datasource
@Configuration(proxyBeanMethods = false)
@ConditionalOnSingleCandidate(DataSource.class)
@ConditionalOnProperty(prefix = "spring.quartz", name = "job-store-type", havingValue = "jdbc")
protected static class JdbcStoreTypeConfiguration {
@Bean
@Order(0)
public SchedulerFactoryBeanCustomizer dataSourceCustomizer(QuartzProperties properties, DataSource dataSource,
@QuartzDataSource ObjectProvider<DataSource> quartzDataSource,
ObjectProvider<PlatformTransactionManager> transactionManager,
@QuartzTransactionManager ObjectProvider<PlatformTransactionManager> quartzTransactionManager) {
return (schedulerFactoryBean) -> {
DataSource dataSourceToUse = getDataSource(dataSource, quartzDataSource);
schedulerFactoryBean.setDataSource(dataSourceToUse);
PlatformTransactionManager txManager = getTransactionManager(transactionManager,
quartzTransactionManager);
if (txManager != null) {
schedulerFactoryBean.setTransactionManager(txManager);
}
};
}
private DataSource getDataSource(DataSource dataSource, ObjectProvider<DataSource> quartzDataSource) {
DataSource dataSourceIfAvailable = quartzDataSource.getIfAvailable();
return (dataSourceIfAvailable != null) ? dataSourceIfAvailable : dataSource;
}
private PlatformTransactionManager getTransactionManager(
ObjectProvider<PlatformTransactionManager> transactionManager,
ObjectProvider<PlatformTransactionManager> quartzTransactionManager) {
PlatformTransactionManager transactionManagerIfAvailable = quartzTransactionManager.getIfAvailable();
return (transactionManagerIfAvailable != null) ? transactionManagerIfAvailable
: transactionManager.getIfUnique();
}
@Bean
@ConditionalOnMissingBean
public QuartzDataSourceInitializer quartzDataSourceInitializer(DataSource dataSource,
@QuartzDataSource ObjectProvider<DataSource> quartzDataSource, ResourceLoader resourceLoader,
QuartzProperties properties) {
DataSource dataSourceToUse = getDataSource(dataSource, quartzDataSource);
return new QuartzDataSourceInitializer(dataSourceToUse, resourceLoader, properties);
}
}
在customizers.orderedStream().forEach((customizer) -> customizer.customize(schedulerFactoryBean));
进行配置。 Datasource 来自于DataSourceAutoConfiguration,我们需要指定一个数据源,在application.yaml里
spring:
datasource:
url: jdbc:mysql://127.0.0.1:3306/MYSLQ_local?serverTimezone=GMT-7&useLegacyDatetimeCode=false
username: MYSLQ_local
password: MYSLQ_local_local
jpa:
database-platform: org.hibernate.dialect.MySQL8Dialect
在看@ConditionalOnProperty(prefix = "spring.quartz", name = "job-store-type", havingValue = "jdbc") ,也就是说当spring.quartz.job-store-type = jdbc 的时候这个配置生效。 因此我们在application.yaml里要配置
spring:
quartz:
properties:
org:
quartz:
jobStore:
tablePrefix: cpm_qrtz_
isClustered: true
class: org.quartz.impl.jdbcjobstore.JobStoreTX
driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate
useProperties: false
threadPool:
class: org.quartz.simpl.SimpleThreadPool
threadCount: 50
threadsInheritContextClassLoaderOfInitializingThread: true
scheduler:
instanceId: AUTO
job-store-type: jdbc
那么有人问了,除了job-store-type,上面其他的配置没看到什么时候放进去的。怎么就可以用了呢, 首先看@EnableConfigurationProperties(QuartzProperties.class) ,它读取了spring.quartz, 把他放入properties 对象理。
@ConfigurationProperties("spring.quartz")
public class QuartzProperties {
private final Map<String, String> properties = new HashMap<>();
public Map<String, String> getProperties() {
return this.properties;
}
}
同时这个properties 在QuartzAutoConfiguration.quartzScheduler 方法里被放入schedulerFactoryBean里
if (!properties.getProperties().isEmpty()) {
schedulerFactoryBean.setQuartzProperties(asProperties(properties.getProperties()));
}
Create Scheduler
当代码里有这种@Autowired注入Scheduler的时候,如下
@Autowired
Scheduler scheduler
BeanFactory会根据name为Scheduler来获取Scheduler,最终会通过SchedulerFactoryBean.getObject()
来得到Scheduler。
通过分析得出FactoryBean.getObject()里得到的是this.scheduler.
public class SchedulerFactoryBean extends SchedulerAccessor implements FactoryBean<Scheduler>,
BeanNameAware, ApplicationContextAware, InitializingBean, DisposableBean, SmartLifecycle {
private Scheduler scheduler;
public Scheduler getObject() {
return this.scheduler;
}
}
但这个Scheduler 在哪里被构造处理呢?SchedulerFactoryBean implements InitializingBean,在InitializingBean里有afterPropertiesSet()
public class SchedulerFactoryBean extends SchedulerAccessor implements FactoryBean<Scheduler>,
BeanNameAware, ApplicationContextAware, InitializingBean, DisposableBean, SmartLifecycle {
public void afterPropertiesSet() throws Exception {
...
// Initialize the Scheduler instance...
//这里初始化了Scheduler
this.scheduler = prepareScheduler(prepareSchedulerFactory());
try {
registerListeners();
registerJobsAndTriggers();
}
catch (Exception ex) {
...
}
}
}
bean 生命周期简单理解如下,注意InitializingBean#afterPropertiesSet()所处的位置
->construcor
->initialization(各种autowired)
->BeanPostProcessor#postProcessBeforeInitialization
->@postConsruct 或 InitializingBean#afterPropertiesSet() 或 @Bean(initMethod="xxx")
->BeanPostProcessor#postProcessAfterInitialization
->@PreDestroy
看prepareSchedulerFactory()方法,最后返回了SchedulerFactory,赋值给了SchedulerFactoryBean的scheduler
public class SchedulerFactoryBean{
private Class<? extends SchedulerFactory> schedulerFactoryClass =
StdSchedulerFactory.class;
private SchedulerFactory prepareSchedulerFactory() throws SchedulerException, IOException {
private SchedulerFactory schedulerFactory;
//这里这个SchedulerFactory肯定为空,当然有办法可以让它不为空,通过定义SchedulerFactoryBeanCustomizer来实现
SchedulerFactory schedulerFactory = this.schedulerFactory;
if (schedulerFactory == null) {
// Create local SchedulerFactory instance (typically a StdSchedulerFactory)
//这里也写了这里是实例化出StdSchedulerFactory
schedulerFactory = BeanUtils.instantiateClass(this.schedulerFactoryClass);
if (schedulerFactory instanceof StdSchedulerFactory) {
//调用initSchedulerFactory来填充StdSchedulerFactory)
//看过Quartz的官方demo,就知道StdSchedulerFactory用来生产出sheduler
initSchedulerFactory((StdSchedulerFactory) schedulerFactory);
}
else if (this.configLocation != null || this.quartzProperties != null ||
this.taskExecutor != null || this.dataSource != null) {
throw new IllegalArgumentException(
"StdSchedulerFactory required for applying Quartz properties: " + schedulerFactory);
}
// Otherwise, no local settings to be applied via StdSchedulerFactory.initialize(Properties)
}
// Otherwise, assume that externally provided factory has been initialized with appropriate settings
return schedulerFactory;
}
}
initSchedulerFactory主要是将配置信息配置到schedulerFactory里
private void initSchedulerFactory(StdSchedulerFactory schedulerFactory) throws SchedulerException, IOException {
Properties mergedProps = new Properties();
...
CollectionUtils.mergePropertiesIntoMap(this.quartzProperties, mergedProps);
...
//这里很重要,
//可以直接通过 application.properties里的配置来配置quartz.properties里的配置spring.quartz.properties.xxx
schedulerFactory.initialize(mergedProps);
}
CollectionUtils.mergePropertiesIntoMap(this.quartzProperties, mergedProps);
把从application.yaml读到的配置 覆盖quartz 缺省的配置。从而是application.yaml 的配置生效。
prepareScheduler(prepareSchedulerFactory())
在一些列的调用后会到StdSchedulerFactory的private Scheduler instantiate()
。这是个很长的方法
,但逻辑还算简单,各种初始化,下面只列出jobStore的配置
private Scheduler instantiate() throws SchedulerException {
//cfg 就是上个代码片段里的mergedProps
if (cfg == null) {
initialize();
}
//jobstore,如果不配,默认是RAMJobStore
//在application.properties里可以配置为
//spring.quartz.properties.org.quartz.jobStore.class=org.quartz.impl.jdbcjobstore.JobStoreTX
String jsClass = cfg.getStringProperty(PROP_JOB_STORE_CLASS,
RAMJobStore.class.getName());
try {
js = (JobStore) loadHelper.loadClass(jsClass).newInstance();
} catch (Exception e) {
initException = new SchedulerException("JobStore class '" + jsClass
+ "' could not be instantiated.", e);
throw initException;
}
SchedulerDetailsSetter.setDetails(js, schedName, schedInstId);
//这里可以获取到spring.quartz.properties.org.quartz.jobStore.xxx
//xxx是jobStore这个类的里属性,比如isClustered,clusterCheckinInterval等
//在applicaiton.properties配置成spring.quartz.properties.org.quartz.jobStore.isClustered = true
tProps = cfg.getPropertyGroup(PROP_JOB_STORE_PREFIX, true, new String[] {PROP_JOB_STORE_LOCK_HANDLER_PREFIX});
try {
setBeanProps(js, tProps);
} catch (Exception e) {
initException = new SchedulerException("JobStore class '" + jsClass
+ "' props could not be configured.", e);
throw initException;
}
}
因此我们要配置集群就可以在application.properties配置成如下:
spring.quartz.properties.org.quartz.jobStore.class=org.springframework.scheduling.quartz.LocalDataSourceJobStore
spring.quartz.properties.org.quartz.jobStore.isClustered = true
spring.quartz.properties.org.quartz.jobStore.clusterCheckinInterval = 10000
spring.quartz.properties.org.quartz.scheduler.instanceId = AUTO
再来看prepareScheduler 方法
private Scheduler prepareScheduler(SchedulerFactory schedulerFactory) throws SchedulerException {
if (this.resourceLoader != null) {
// Make given ResourceLoader available for SchedulerFactory configuration.
configTimeResourceLoaderHolder.set(this.resourceLoader);
}
if (this.taskExecutor != null) {
// Make given TaskExecutor available for SchedulerFactory configuration.
configTimeTaskExecutorHolder.set(this.taskExecutor);
}
if (this.dataSource != null) {
// Make given DataSource available for SchedulerFactory configuration.
configTimeDataSourceHolder.set(this.dataSource);
}
if (this.nonTransactionalDataSource != null) {
// Make given non-transactional DataSource available for SchedulerFactory configuration.
configTimeNonTransactionalDataSourceHolder.set(this.nonTransactionalDataSource);
}
// Get Scheduler instance from SchedulerFactory.
try {
Scheduler scheduler = createScheduler(schedulerFactory, this.schedulerName);
populateSchedulerContext(scheduler);
if (!this.jobFactorySet && !(scheduler instanceof RemoteScheduler)) {
// Use AdaptableJobFactory as default for a local Scheduler, unless when
// explicitly given a null value through the "jobFactory" bean property.
this.jobFactory = new AdaptableJobFactory();
}
if (this.jobFactory != null) {
if (this.applicationContext != null && this.jobFactory instanceof ApplicationContextAware) {
((ApplicationContextAware) this.jobFactory).setApplicationContext(this.applicationContext);
}
if (this.jobFactory instanceof SchedulerContextAware) {
((SchedulerContextAware) this.jobFactory).setSchedulerContext(scheduler.getContext());
}
scheduler.setJobFactory(this.jobFactory);
}
return scheduler;
}
finally {
if (this.resourceLoader != null) {
configTimeResourceLoaderHolder.remove();
}
if (this.taskExecutor != null) {
configTimeTaskExecutorHolder.remove();
}
if (this.dataSource != null) {
configTimeDataSourceHolder.remove();
}
if (this.nonTransactionalDataSource != null) {
configTimeNonTransactionalDataSourceHolder.remove();
}
}
}
这里面最重要的就是createScheduler(schedulerFactory, this.schedulerName);
方法。 从上文可以看出,这个shcedulerFactory的实例是StdSchedulerFactory。
protected Scheduler createScheduler(SchedulerFactory schedulerFactory, @Nullable String schedulerName)
throws SchedulerException {
// Override thread context ClassLoader to work around naive Quartz ClassLoadHelper loading.
Thread currentThread = Thread.currentThread();
ClassLoader threadContextClassLoader = currentThread.getContextClassLoader();
boolean overrideClassLoader = (this.resourceLoader != null &&
this.resourceLoader.getClassLoader() != threadContextClassLoader);
if (overrideClassLoader) {
currentThread.setContextClassLoader(this.resourceLoader.getClassLoader());
}
try {
SchedulerRepository repository = SchedulerRepository.getInstance();
synchronized (repository) {
Scheduler existingScheduler = (schedulerName != null ? repository.lookup(schedulerName) : null);
Scheduler newScheduler = schedulerFactory.getScheduler();
if (newScheduler == existingScheduler) {
throw new IllegalStateException("Active Scheduler of name '" + schedulerName + "' already registered " +
"in Quartz SchedulerRepository. Cannot create a new Spring-managed Scheduler of the same name!");
}
if (!this.exposeSchedulerInRepository) {
// Need to remove it in this case, since Quartz shares the Scheduler instance by default!
SchedulerRepository.getInstance().remove(newScheduler.getSchedulerName());
}
return newScheduler;
}
}
finally {
if (overrideClassLoader) {
// Reset original thread context ClassLoader.
currentThread.setContextClassLoader(threadContextClassLoader);
}
}
}
去看一下StdSchedulerFactory::getScheduler 方法
public Scheduler getScheduler() throws SchedulerException {
if (cfg == null) {
initialize();
}
SchedulerRepository schedRep = SchedulerRepository.getInstance();
Scheduler sched = schedRep.lookup(getSchedulerName());
if (sched != null) {
if (sched.isShutdown()) {
schedRep.remove(getSchedulerName());
} else {
return sched;
}
}
sched = instantiate();
return sched;
}
在看instantiate 方法,设置了启动的各种参数,比如threadpool jstore 的class 等等,这个方法超长,就不仔细描述了,但是最关键的是
qs = new QuartzScheduler(rsrcs, idleWaitTime, dbFailureRetry);
在 QuartzScheduler 构造函数里, 启动了一个任务分配线程QuartzSchedulerThread
public QuartzScheduler(QuartzSchedulerResources resources, long idleWaitTime, @Deprecated long dbRetryInterval)
throws SchedulerException {
this.resources = resources;
if (resources.getJobStore() instanceof JobListener) {
addInternalJobListener((JobListener)resources.getJobStore());
}
this.schedThread = new QuartzSchedulerThread(this, resources);
ThreadExecutor schedThreadExecutor = resources.getThreadExecutor();
schedThreadExecutor.execute(this.schedThread);
if (idleWaitTime > 0) {
this.schedThread.setIdleWaitTime(idleWaitTime);
}
jobMgr = new ExecutingJobsManager();
addInternalJobListener(jobMgr);
errLogger = new ErrorLogger();
addInternalSchedulerListener(errLogger);
signaler = new SchedulerSignalerImpl(this, this.schedThread);
getLog().info("Quartz Scheduler v." + getVersion() + " created.");
}
QuartzSchedulerThread ::run 方法很长, 但是我还是想把他全部列出来。
@Override
public void run() {
int acquiresFailed = 0;
while (!halted.get()) {
try {
// check if we're supposed to pause...
synchronized (sigLock) {
while (paused && !halted.get()) {
try {
// wait until togglePause(false) is called...
sigLock.wait(1000L);
} catch (InterruptedException ignore) {
}
// reset failure counter when paused, so that we don't
// wait again after unpausing
acquiresFailed = 0;
}
if (halted.get()) {
break;
}
}
// wait a bit, if reading from job store is consistently
// failing (e.g. DB is down or restarting)..
if (acquiresFailed > 1) {
try {
long delay = computeDelayForRepeatedErrors(qsRsrcs.getJobStore(), acquiresFailed);
Thread.sleep(delay);
} catch (Exception ignore) {
}
}
int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
if(availThreadCount > 0) { // will always be true, due to semantics of blockForAvailableThreads...
List<OperableTrigger> triggers;
long now = System.currentTimeMillis();
clearSignaledSchedulingChange();
try {
triggers = qsRsrcs.getJobStore().acquireNextTriggers(
now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
acquiresFailed = 0;
if (log.isDebugEnabled())
log.debug("batch acquisition of " + (triggers == null ? 0 : triggers.size()) + " triggers");
} catch (JobPersistenceException jpe) {
if (acquiresFailed == 0) {
qs.notifySchedulerListenersError(
"An error occurred while scanning for the next triggers to fire.",
jpe);
}
if (acquiresFailed < Integer.MAX_VALUE)
acquiresFailed++;
continue;
} catch (RuntimeException e) {
if (acquiresFailed == 0) {
getLog().error("quartzSchedulerThreadLoop: RuntimeException "
+e.getMessage(), e);
}
if (acquiresFailed < Integer.MAX_VALUE)
acquiresFailed++;
continue;
}
if (triggers != null && !triggers.isEmpty()) {
now = System.currentTimeMillis();
long triggerTime = triggers.get(0).getNextFireTime().getTime();
long timeUntilTrigger = triggerTime - now;
while(timeUntilTrigger > 2) {
synchronized (sigLock) {
if (halted.get()) {
break;
}
if (!isCandidateNewTimeEarlierWithinReason(triggerTime, false)) {
try {
// we could have blocked a long while
// on 'synchronize', so we must recompute
now = System.currentTimeMillis();
timeUntilTrigger = triggerTime - now;
if(timeUntilTrigger >= 1)
sigLock.wait(timeUntilTrigger);
} catch (InterruptedException ignore) {
}
}
}
if(releaseIfScheduleChangedSignificantly(triggers, triggerTime)) {
break;
}
now = System.currentTimeMillis();
timeUntilTrigger = triggerTime - now;
}
// this happens if releaseIfScheduleChangedSignificantly decided to release triggers
if(triggers.isEmpty())
continue;
// set triggers to 'executing'
List<TriggerFiredResult> bndles = new ArrayList<TriggerFiredResult>();
boolean goAhead = true;
synchronized(sigLock) {
goAhead = !halted.get();
}
if(goAhead) {
try {
List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);
if(res != null)
bndles = res;
} catch (SchedulerException se) {
qs.notifySchedulerListenersError(
"An error occurred while firing triggers '"
+ triggers + "'", se);
//QTZ-179 : a problem occurred interacting with the triggers from the db
//we release them and loop again
for (int i = 0; i < triggers.size(); i++) {
qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
}
continue;
}
}
for (int i = 0; i < bndles.size(); i++) {
TriggerFiredResult result = bndles.get(i);
TriggerFiredBundle bndle = result.getTriggerFiredBundle();
Exception exception = result.getException();
if (exception instanceof RuntimeException) {
getLog().error("RuntimeException while firing trigger " + triggers.get(i), exception);
qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
continue;
}
// it's possible to get 'null' if the triggers was paused,
// blocked, or other similar occurrences that prevent it being
// fired at this time... or if the scheduler was shutdown (halted)
if (bndle == null) {
qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
continue;
}
JobRunShell shell = null;
try {
shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
shell.initialize(qs);
} catch (SchedulerException se) {
qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
continue;
}
if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
// this case should never happen, as it is indicative of the
// scheduler being shutdown or a bug in the thread pool or
// a thread pool being used concurrently - which the docs
// say not to do...
getLog().error("ThreadPool.runInThread() return false!");
qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
}
}
continue; // while (!halted)
}
} else { // if(availThreadCount > 0)
// should never happen, if threadPool.blockForAvailableThreads() follows contract
continue; // while (!halted)
}
long now = System.currentTimeMillis();
long waitTime = now + getRandomizedIdleWaitTime();
long timeUntilContinue = waitTime - now;
synchronized(sigLock) {
try {
if(!halted.get()) {
// QTZ-336 A job might have been completed in the mean time and we might have
// missed the scheduled changed signal by not waiting for the notify() yet
// Check that before waiting for too long in case this very job needs to be
// scheduled very soon
if (!isScheduleChanged()) {
sigLock.wait(timeUntilContinue);
}
}
} catch (InterruptedException ignore) {
}
}
} catch(RuntimeException re) {
getLog().error("Runtime error occurred in main trigger firing loop.", re);
}
} // while (!halted)
// drop references to scheduler stuff to aid garbage collection...
qs = null;
qsRsrcs = null;
}
找到可以运行的 job trigger
这个run 方法里,做几点事情, 第一是找到可以运行的 job trigger
try {
triggers = qsRsrcs.getJobStore().acquireNextTriggers(
now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
acquiresFailed = 0;
if (log.isDebugEnabled())
log.debug("batch acquisition of " + (triggers == null ? 0 : triggers.size()) + " triggers");
} catch (JobPersistenceException jpe) {
if (acquiresFailed == 0) {
qs.notifySchedulerListenersError(
"An error occurred while scanning for the next triggers to fire.",
jpe);
}
为啥quartz 一个job 只可以在一个机器上运行啊 qsRsrcs.getJobStore().acquireNextTriggers主要就是这个方法
public List<OperableTrigger> acquireNextTriggers(final long noLaterThan, final int maxCount, final long timeWindow)
throws JobPersistenceException {
String lockName;
if(isAcquireTriggersWithinLock() || maxCount > 1) {
lockName = LOCK_TRIGGER_ACCESS;
} else {
lockName = null;
}
return executeInNonManagedTXLock(lockName,
new TransactionCallback<List<OperableTrigger>>() {
public List<OperableTrigger> execute(Connection conn) throws JobPersistenceException {
return acquireNextTrigger(conn, noLaterThan, maxCount, timeWindow);
}
},
new TransactionValidator<List<OperableTrigger>>() {
public Boolean validate(Connection conn, List<OperableTrigger> result) throws JobPersistenceException {
try {
List<FiredTriggerRecord> acquired = getDelegate().selectInstancesFiredTriggerRecords(conn, getInstanceId());
Set<String> fireInstanceIds = new HashSet<String>();
for (FiredTriggerRecord ft : acquired) {
fireInstanceIds.add(ft.getFireInstanceId());
}
for (OperableTrigger tr : result) {
if (fireInstanceIds.contains(tr.getFireInstanceId())) {
return true;
}
}
return false;
} catch (SQLException e) {
throw new JobPersistenceException("error validating trigger acquisition", e);
}
}
});
}
在这锁的包装下,进行了真正的acquireNextTrigger
return executeInNonManagedTXLock(lockName,
new TransactionCallback<List<OperableTrigger>>() {
public List<OperableTrigger> execute(Connection conn) throws JobPersistenceException {
return acquireNextTrigger(conn, noLaterThan, maxCount, timeWindow);
}
},
来看真正的数据库逻辑
// FUTURE_TODO: this really ought to return something like a FiredTriggerBundle,
// so that the fireInstanceId doesn't have to be on the trigger...
protected List<OperableTrigger> acquireNextTrigger(Connection conn, long noLaterThan, int maxCount, long timeWindow)
throws JobPersistenceException {
if (timeWindow < 0) {
throw new IllegalArgumentException();
}
List<OperableTrigger> acquiredTriggers = new ArrayList<OperableTrigger>();
Set<JobKey> acquiredJobKeysForNoConcurrentExec = new HashSet<JobKey>();
final int MAX_DO_LOOP_RETRY = 3;
int currentLoopCount = 0;
do {
currentLoopCount ++;
try {
List<TriggerKey> keys = getDelegate().selectTriggerToAcquire(conn, noLaterThan + timeWindow, getMisfireTime(), maxCount);
// No trigger is ready to fire yet.
if (keys == null || keys.size() == 0)
return acquiredTriggers;
long batchEnd = noLaterThan;
for(TriggerKey triggerKey: keys) {
// If our trigger is no longer available, try a new one.
OperableTrigger nextTrigger = retrieveTrigger(conn, triggerKey);
if(nextTrigger == null) {
continue; // next trigger
}
// If trigger's job is set as @DisallowConcurrentExecution, and it has already been added to result, then
// put it back into the timeTriggers set and continue to search for next trigger.
JobKey jobKey = nextTrigger.getJobKey();
JobDetail job;
try {
job = retrieveJob(conn, jobKey);
} catch (JobPersistenceException jpe) {
try {
getLog().error("Error retrieving job, setting trigger state to ERROR.", jpe);
getDelegate().updateTriggerState(conn, triggerKey, STATE_ERROR);
} catch (SQLException sqle) {
getLog().error("Unable to set trigger state to ERROR.", sqle);
}
continue;
}
if (job.isConcurrentExectionDisallowed()) {
if (acquiredJobKeysForNoConcurrentExec.contains(jobKey)) {
continue; // next trigger
} else {
acquiredJobKeysForNoConcurrentExec.add(jobKey);
}
}
Date nextFireTime = nextTrigger.getNextFireTime();
// A trigger should not return NULL on nextFireTime when fetched from DB.
// But for whatever reason if we do have this (BAD trigger implementation or
// data?), we then should log a warning and continue to next trigger.
// User would need to manually fix these triggers from DB as they will not
// able to be clean up by Quartz since we are not returning it to be processed.
if (nextFireTime == null) {
log.warn("Trigger {} returned null on nextFireTime and yet still exists in DB!",
nextTrigger.getKey());
continue;
}
if (nextFireTime.getTime() > batchEnd) {
break;
}
// We now have a acquired trigger, let's add to return list.
// If our trigger was no longer in the expected state, try a new one.
int rowsUpdated = getDelegate().updateTriggerStateFromOtherState(conn, triggerKey, STATE_ACQUIRED, STATE_WAITING);
if (rowsUpdated <= 0) {
continue; // next trigger
}
nextTrigger.setFireInstanceId(getFiredTriggerRecordId());
getDelegate().insertFiredTrigger(conn, nextTrigger, STATE_ACQUIRED, null);
if(acquiredTriggers.isEmpty()) {
batchEnd = Math.max(nextFireTime.getTime(), System.currentTimeMillis()) + timeWindow;
}
acquiredTriggers.add(nextTrigger);
}
// if we didn't end up with any trigger to fire from that first
// batch, try again for another batch. We allow with a max retry count.
if(acquiredTriggers.size() == 0 && currentLoopCount < MAX_DO_LOOP_RETRY) {
continue;
}
// We are done with the while loop.
break;
} catch (Exception e) {
throw new JobPersistenceException(
"Couldn't acquire next trigger: " + e.getMessage(), e);
}
} while (true);
// Return the acquired trigger list
return acquiredTriggers;
}
主要看两个方法
选择一个trigger
List<TriggerKey> keys = getDelegate().selectTriggerToAcquire(conn, noLaterThan + timeWindow, getMisfireTime(), maxCount);
标注trigger 被占用,不用被其他的instance选择了
// We now have a acquired trigger, let's add to return list.
// If our trigger was no longer in the expected state, try a new one.
int rowsUpdated = getDelegate().updateTriggerStateFromOtherState(conn, triggerKey, STATE_ACQUIRED, STATE_WAITING);
if (rowsUpdated <= 0) {
continue; // next trigger
}
nextTrigger.setFireInstanceId(getFiredTriggerRecordId());
getDelegate().insertFiredTrigger(conn, nextTrigger, STATE_ACQUIRED, null);
运行job
回到前文QuartzSchedulerThread的run 方法
for (int i = 0; i < bndles.size(); i++) {
TriggerFiredResult result = bndles.get(i);
TriggerFiredBundle bndle = result.getTriggerFiredBundle();
Exception exception = result.getException();
if (exception instanceof RuntimeException) {
getLog().error("RuntimeException while firing trigger " + triggers.get(i), exception);
qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
continue;
}
// it's possible to get 'null' if the triggers was paused,
// blocked, or other similar occurrences that prevent it being
// fired at this time... or if the scheduler was shutdown (halted)
if (bndle == null) {
qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
continue;
}
JobRunShell shell = null;
try {
shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
shell.initialize(qs);
} catch (SchedulerException se) {
qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
continue;
}
if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
// this case should never happen, as it is indicative of the
// scheduler being shutdown or a bug in the thread pool or
// a thread pool being used concurrently - which the docs
// say not to do...
getLog().error("ThreadPool.runInThread() return false!");
qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
}
}
continue; // while (!halted)
}
主要是看下面这段, 去真正运行一个job
JobRunShell shell = null;
try {
shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
shell.initialize(qs);
} catch (SchedulerException se) {
qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
continue;
}
if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
// this case should never happen, as it is indicative of the
// scheduler being shutdown or a bug in the thread pool or
// a thread pool being used concurrently - which the docs
// say not to do...
getLog().error("ThreadPool.runInThread() return false!");
qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
}
咱们来看 shell.initialize(qs);
public void initialize(QuartzScheduler sched)
throws SchedulerException {
this.qs = sched;
Job job = null;
JobDetail jobDetail = firedTriggerBundle.getJobDetail();
try {
job = sched.getJobFactory().newJob(firedTriggerBundle, scheduler);
} catch (SchedulerException se) {
sched.notifySchedulerListenersError(
"An error occured instantiating job to be executed. job= '"
+ jobDetail.getKey() + "'", se);
throw se;
} catch (Throwable ncdfe) { // such as NoClassDefFoundError
SchedulerException se = new SchedulerException(
"Problem instantiating class '"
+ jobDetail.getJobClass().getName() + "' - ", ncdfe);
sched.notifySchedulerListenersError(
"An error occured instantiating job to be executed. job= '"
+ jobDetail.getKey() + "'", se);
throw se;
}
this.jec = new JobExecutionContextImpl(scheduler, firedTriggerBundle, job);
}
此时sched.getJobFactory()来生成一个job ,这个jobFactory 是哪里set的? 在回到最开始的源头QuartzAutoConfiguration::quartzScheduler
SpringBeanJobFactory jobFactory = new SpringBeanJobFactory();
jobFactory.setApplicationContext(applicationContext);
schedulerFactoryBean.setJobFactory(jobFactory);
可以看到jobFactory 是SpringBeanJobFactory::createJobInstance
protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {
Object job = (this.applicationContext != null ?
this.applicationContext.getAutowireCapableBeanFactory().createBean(
bundle.getJobDetail().getJobClass(), AutowireCapableBeanFactory.AUTOWIRE_CONSTRUCTOR, false) :
super.createJobInstance(bundle));
if (isEligibleForPropertyPopulation(job)) {
BeanWrapper bw = PropertyAccessorFactory.forBeanPropertyAccess(job);
MutablePropertyValues pvs = new MutablePropertyValues();
if (this.schedulerContext != null) {
pvs.addPropertyValues(this.schedulerContext);
}
pvs.addPropertyValues(bundle.getJobDetail().getJobDataMap());
pvs.addPropertyValues(bundle.getTrigger().getJobDataMap());
if (this.ignoredUnknownProperties != null) {
for (String propName : this.ignoredUnknownProperties) {
if (pvs.contains(propName) && !bw.isWritableProperty(propName)) {
pvs.removePropertyValue(propName);
}
}
bw.setPropertyValues(pvs);
}
else {
bw.setPropertyValues(pvs, true);
}
}
return job;
}
可以看到它在第一行就把传入的TriggerFiredBundle 的job class 变成了一个spring bean,到这里才恍然大悟。 这也就是为什么在本文开头一个QuartzJobBean 没加任何annotation,为什么还可以autowired对象。
当qsRsrcs.getThreadPool().runInThread(shell)
时, 就是真正的运行job 了。 后续还有一些逻辑, 比如release trigger 之类的就不仔细描述了。 本文只是走一些quatrz 主要是如何spring boot 集成的。