Spring Boot Quartz 是如何工作的

今天看到一个quartz 类的定义,一下子引起了我兴趣。

public class FlowSetup extends QuartzJobBean {

    public final Map<StackEnum, TaskFlow> flowMapping = new HashMap<>();

    @Autowired
    private FlowManager flowManager;
...
}

上面这个类没有定义任何annotation, 为什么可以@Autowire ? 如果能autowire 说明它是spring 管理的bean,但是没有被扫描到怎么成为bean 的呢?更进一步的问题就是quartz 是如何跟Spring boot 集成的。

<dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-quartz</artifactId>
</dependency>

利用starter的原理在
spring.factories里有 org.springframework.boot.autoconfigure.quartz.QuartzAutoConfiguration
则启动的时候就会加载QuartzAutoConfiguration,下面看下QuartzAutoConfiguration

@Configuration
@ConditionalOnClass({ Scheduler.class, SchedulerFactoryBean.class,
        PlatformTransactionManager.class })
        //加入QuartzProperties.class
@EnableConfigurationProperties(QuartzProperties.class)
@AutoConfigureAfter({ DataSourceAutoConfiguration.class,
        HibernateJpaAutoConfiguration.class })
public class QuartzAutoConfiguration {

private final QuartzProperties properties;

//构造器注入QuartzProperties
public QuartzAutoConfiguration(QuartzProperties properties,
            ObjectProvider<SchedulerFactoryBeanCustomizer> customizers,
            ObjectProvider<JobDetail[]> jobDetails,
            ObjectProvider<Map<String, Calendar>> calendars,
            ObjectProvider<Trigger[]> triggers, ApplicationContext applicationContext) {
        this.properties = properties;
        this.customizers = customizers;
        this.jobDetails = jobDetails.getIfAvailable();
        this.calendars = calendars.getIfAvailable();
        this.triggers = triggers.getIfAvailable();
        this.applicationContext = applicationContext;
    }
    
//定义一个SchedulerFactoryBean,前提是没有SchedulerFactoryBean    
        @Bean
    @ConditionalOnMissingBean
    public SchedulerFactoryBean quartzScheduler() {
    //new 了一个SchedulerFactoryBean
        SchedulerFactoryBean schedulerFactoryBean = new SchedulerFactoryBean();
        SpringBeanJobFactory jobFactory = new SpringBeanJobFactory();
        jobFactory.setApplicationContext(this.applicationContext);
        schedulerFactoryBean.setJobFactory(jobFactory);
        
        //这里可以定义很多quartz.properteis里的属性
        if (this.properties.getSchedulerName() != null) {
            schedulerFactoryBean.setSchedulerName(this.properties.getSchedulerName());
        }
        schedulerFactoryBean.setAutoStartup(this.properties.isAutoStartup());
        schedulerFactoryBean
                .setStartupDelay((int) this.properties.getStartupDelay().getSeconds());
        schedulerFactoryBean.setWaitForJobsToCompleteOnShutdown(
                this.properties.isWaitForJobsToCompleteOnShutdown());
        schedulerFactoryBean
                .setOverwriteExistingJobs(this.properties.isOverwriteExistingJobs());
        if (!this.properties.getProperties().isEmpty()) {
            schedulerFactoryBean
                    .setQuartzProperties(asProperties(this.properties.getProperties()));
        }
        if (this.jobDetails != null && this.jobDetails.length > 0) {
            schedulerFactoryBean.setJobDetails(this.jobDetails);
        }
        if (this.calendars != null && !this.calendars.isEmpty()) {
            schedulerFactoryBean.setCalendars(this.calendars);
        }
        if (this.triggers != null && this.triggers.length > 0) {
            schedulerFactoryBean.setTriggers(this.triggers);
        }
        customize(schedulerFactoryBean);
        return schedulerFactoryBean;
    }   
    
}

首先定义了一个SchedulerFactoryBean, 可以看到是
implements FactoryBean,也就是这是Scheduler 这个定义的生成者

public class SchedulerFactoryBean extends SchedulerAccessor implements FactoryBean<Scheduler>,
        BeanNameAware, ApplicationContextAware, InitializingBean, DisposableBean, SmartLifecycle {
    //getObject() 是FactoryBean<Scheduler>的接口方法
    public Scheduler getObject() {
        return this.scheduler;
    }
}

FactoryBean和BeanFactory的区别,其实2者除了名字颠倒外,没有必然的关系。BeanFactory是个工厂类,顾名思义就是生产Bean的工厂。而FactoryBean是个生成BeanDefinition的类。

同时在QuartzAutoConfiguration中 通过inner Class SchedulerDependsOnBeanFactoryPostProcessor 进行绑定


    /**
     * {@link AbstractDependsOnBeanFactoryPostProcessor} for Quartz {@link Scheduler} and
     * {@link SchedulerFactoryBean}.
     */
    private static class SchedulerDependsOnBeanFactoryPostProcessor extends AbstractDependsOnBeanFactoryPostProcessor {

        SchedulerDependsOnBeanFactoryPostProcessor(Class<?>... dependencyTypes) {
            super(Scheduler.class, SchedulerFactoryBean.class, dependencyTypes);
        }

    }

SchedulerDependsOnBeanFactoryPostProcessor在 QuartzSchedulerDependencyConfiguration中 进行了bean 注册。

        @Configuration(proxyBeanMethods = false)
        static class QuartzSchedulerDependencyConfiguration {

            @Bean
            static SchedulerDependsOnBeanFactoryPostProcessor quartzSchedulerDataSourceInitializerDependsOnBeanFactoryPostProcessor() {
                return new SchedulerDependsOnBeanFactoryPostProcessor(QuartzDataSourceInitializer.class);
            }

            @Bean
            @ConditionalOnBean(FlywayMigrationInitializer.class)
            static SchedulerDependsOnBeanFactoryPostProcessor quartzSchedulerFlywayDependsOnBeanFactoryPostProcessor() {
                return new SchedulerDependsOnBeanFactoryPostProcessor(FlywayMigrationInitializer.class);
            }

            @Configuration(proxyBeanMethods = false)
            @ConditionalOnClass(SpringLiquibase.class)
            static class LiquibaseQuartzSchedulerDependencyConfiguration {

                @Bean
                @ConditionalOnBean(SpringLiquibase.class)
                static SchedulerDependsOnBeanFactoryPostProcessor quartzSchedulerLiquibaseDependsOnBeanFactoryPostProcessor() {
                    return new SchedulerDependsOnBeanFactoryPostProcessor(SpringLiquibase.class);
                }

            }

        }

    }

在QuartzAutoConfiguration 还配置了SchedulerFactoryBeanCustomizer, 为schedulerFactoryBean配置datasource

    @Configuration(proxyBeanMethods = false)
    @ConditionalOnSingleCandidate(DataSource.class)
    @ConditionalOnProperty(prefix = "spring.quartz", name = "job-store-type", havingValue = "jdbc")
    protected static class JdbcStoreTypeConfiguration {

        @Bean
        @Order(0)
        public SchedulerFactoryBeanCustomizer dataSourceCustomizer(QuartzProperties properties, DataSource dataSource,
                @QuartzDataSource ObjectProvider<DataSource> quartzDataSource,
                ObjectProvider<PlatformTransactionManager> transactionManager,
                @QuartzTransactionManager ObjectProvider<PlatformTransactionManager> quartzTransactionManager) {
            return (schedulerFactoryBean) -> {
                DataSource dataSourceToUse = getDataSource(dataSource, quartzDataSource);
                schedulerFactoryBean.setDataSource(dataSourceToUse);
                PlatformTransactionManager txManager = getTransactionManager(transactionManager,
                        quartzTransactionManager);
                if (txManager != null) {
                    schedulerFactoryBean.setTransactionManager(txManager);
                }
            };
        }

        private DataSource getDataSource(DataSource dataSource, ObjectProvider<DataSource> quartzDataSource) {
            DataSource dataSourceIfAvailable = quartzDataSource.getIfAvailable();
            return (dataSourceIfAvailable != null) ? dataSourceIfAvailable : dataSource;
        }

        private PlatformTransactionManager getTransactionManager(
                ObjectProvider<PlatformTransactionManager> transactionManager,
                ObjectProvider<PlatformTransactionManager> quartzTransactionManager) {
            PlatformTransactionManager transactionManagerIfAvailable = quartzTransactionManager.getIfAvailable();
            return (transactionManagerIfAvailable != null) ? transactionManagerIfAvailable
                    : transactionManager.getIfUnique();
        }

        @Bean
        @ConditionalOnMissingBean
        public QuartzDataSourceInitializer quartzDataSourceInitializer(DataSource dataSource,
                @QuartzDataSource ObjectProvider<DataSource> quartzDataSource, ResourceLoader resourceLoader,
                QuartzProperties properties) {
            DataSource dataSourceToUse = getDataSource(dataSource, quartzDataSource);
            return new QuartzDataSourceInitializer(dataSourceToUse, resourceLoader, properties);
        }

    }

customizers.orderedStream().forEach((customizer) -> customizer.customize(schedulerFactoryBean)); 进行配置。 Datasource 来自于DataSourceAutoConfiguration,我们需要指定一个数据源,在application.yaml里

spring:
  datasource:
    url: jdbc:mysql://127.0.0.1:3306/MYSLQ_local?serverTimezone=GMT-7&useLegacyDatetimeCode=false
    username: MYSLQ_local
    password: MYSLQ_local_local
  jpa:
    database-platform: org.hibernate.dialect.MySQL8Dialect

在看@ConditionalOnProperty(prefix = "spring.quartz", name = "job-store-type", havingValue = "jdbc") ,也就是说当spring.quartz.job-store-type = jdbc 的时候这个配置生效。 因此我们在application.yaml里要配置

spring:
  quartz:
    properties:
      org:
        quartz:
          jobStore:
            tablePrefix: cpm_qrtz_
            isClustered: true
            class: org.quartz.impl.jdbcjobstore.JobStoreTX
            driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate
            useProperties: false
          threadPool:
            class: org.quartz.simpl.SimpleThreadPool
            threadCount: 50
            threadsInheritContextClassLoaderOfInitializingThread: true
          scheduler:
            instanceId: AUTO
    job-store-type: jdbc

那么有人问了,除了job-store-type,上面其他的配置没看到什么时候放进去的。怎么就可以用了呢, 首先看@EnableConfigurationProperties(QuartzProperties.class) ,它读取了spring.quartz, 把他放入properties 对象理。

@ConfigurationProperties("spring.quartz")
public class QuartzProperties {
    private final Map<String, String> properties = new HashMap<>();
    public Map<String, String> getProperties() {
        return this.properties;
    }
}

同时这个properties 在QuartzAutoConfiguration.quartzScheduler 方法里被放入schedulerFactoryBean里

        if (!properties.getProperties().isEmpty()) {
            schedulerFactoryBean.setQuartzProperties(asProperties(properties.getProperties()));
        }

Create Scheduler

当代码里有这种@Autowired注入Scheduler的时候,如下

 @Autowired
 Scheduler scheduler

BeanFactory会根据name为Scheduler来获取Scheduler,最终会通过SchedulerFactoryBean.getObject()来得到Scheduler。
通过分析得出FactoryBean.getObject()里得到的是this.scheduler.

public class SchedulerFactoryBean extends SchedulerAccessor implements FactoryBean<Scheduler>,
        BeanNameAware, ApplicationContextAware, InitializingBean, DisposableBean, SmartLifecycle {
    private Scheduler scheduler;
    public Scheduler getObject() {
            return this.scheduler;
        }
}

但这个Scheduler 在哪里被构造处理呢?SchedulerFactoryBean implements InitializingBean,在InitializingBean里有afterPropertiesSet()

public class SchedulerFactoryBean extends SchedulerAccessor implements FactoryBean<Scheduler>,
        BeanNameAware, ApplicationContextAware, InitializingBean, DisposableBean, SmartLifecycle {
        
        public void afterPropertiesSet() throws Exception {
        ...
        // Initialize the Scheduler instance...
        //这里初始化了Scheduler
        this.scheduler = prepareScheduler(prepareSchedulerFactory());
        try {
            registerListeners();
            registerJobsAndTriggers();
        }
        catch (Exception ex) {
            ...
        }
    }
    
}

bean 生命周期简单理解如下,注意InitializingBean#afterPropertiesSet()所处的位置
->construcor
->initialization(各种autowired)
->BeanPostProcessor#postProcessBeforeInitialization
->@postConsruct 或 InitializingBean#afterPropertiesSet() 或 @Bean(initMethod="xxx")
->BeanPostProcessor#postProcessAfterInitialization
->@PreDestroy

看prepareSchedulerFactory()方法,最后返回了SchedulerFactory,赋值给了SchedulerFactoryBean的scheduler

public class SchedulerFactoryBean{
    
    private Class<? extends SchedulerFactory> schedulerFactoryClass = 
    StdSchedulerFactory.class;
    
    private SchedulerFactory prepareSchedulerFactory() throws SchedulerException, IOException {
            private SchedulerFactory schedulerFactory;
        //这里这个SchedulerFactory肯定为空,当然有办法可以让它不为空,通过定义SchedulerFactoryBeanCustomizer来实现   
        SchedulerFactory schedulerFactory = this.schedulerFactory;
        if (schedulerFactory == null) {
            // Create local SchedulerFactory instance (typically a StdSchedulerFactory)
            //这里也写了这里是实例化出StdSchedulerFactory
            schedulerFactory = BeanUtils.instantiateClass(this.schedulerFactoryClass);
            if (schedulerFactory instanceof StdSchedulerFactory) {
            //调用initSchedulerFactory来填充StdSchedulerFactory)
            //看过Quartz的官方demo,就知道StdSchedulerFactory用来生产出sheduler
            initSchedulerFactory((StdSchedulerFactory) schedulerFactory);
            }
            else if (this.configLocation != null || this.quartzProperties != null ||
                    this.taskExecutor != null || this.dataSource != null) {
                throw new IllegalArgumentException(
                        "StdSchedulerFactory required for applying Quartz properties: " + schedulerFactory);
            }
            // Otherwise, no local settings to be applied via StdSchedulerFactory.initialize(Properties)
        }
        // Otherwise, assume that externally provided factory has been initialized with appropriate settings
        return schedulerFactory;
    }
}

initSchedulerFactory主要是将配置信息配置到schedulerFactory里

private void initSchedulerFactory(StdSchedulerFactory schedulerFactory) throws SchedulerException, IOException {
        Properties mergedProps = new Properties();
        ...    
        CollectionUtils.mergePropertiesIntoMap(this.quartzProperties, mergedProps);
        ...
        //这里很重要,
        //可以直接通过 application.properties里的配置来配置quartz.properties里的配置spring.quartz.properties.xxx
        schedulerFactory.initialize(mergedProps);
    }

CollectionUtils.mergePropertiesIntoMap(this.quartzProperties, mergedProps); 把从application.yaml读到的配置 覆盖quartz 缺省的配置。从而是application.yaml 的配置生效。

prepareScheduler(prepareSchedulerFactory())在一些列的调用后会到StdSchedulerFactory的private Scheduler instantiate()。这是个很长的方法
,但逻辑还算简单,各种初始化,下面只列出jobStore的配置

private Scheduler instantiate() throws SchedulerException {
  //cfg 就是上个代码片段里的mergedProps
        if (cfg == null) {
            initialize();
        }
        //jobstore,如果不配,默认是RAMJobStore
        //在application.properties里可以配置为
        //spring.quartz.properties.org.quartz.jobStore.class=org.quartz.impl.jdbcjobstore.JobStoreTX
           String jsClass = cfg.getStringProperty(PROP_JOB_STORE_CLASS,
                RAMJobStore.class.getName());
    
    try {
            js = (JobStore) loadHelper.loadClass(jsClass).newInstance();
        } catch (Exception e) {
            initException = new SchedulerException("JobStore class '" + jsClass
                    + "' could not be instantiated.", e);
            throw initException;
        }

        SchedulerDetailsSetter.setDetails(js, schedName, schedInstId);

    //这里可以获取到spring.quartz.properties.org.quartz.jobStore.xxx
    //xxx是jobStore这个类的里属性,比如isClustered,clusterCheckinInterval等
    //在applicaiton.properties配置成spring.quartz.properties.org.quartz.jobStore.isClustered = true
        tProps = cfg.getPropertyGroup(PROP_JOB_STORE_PREFIX, true, new String[] {PROP_JOB_STORE_LOCK_HANDLER_PREFIX});
        try {
            setBeanProps(js, tProps);
        } catch (Exception e) {
            initException = new SchedulerException("JobStore class '" + jsClass
                    + "' props could not be configured.", e);
            throw initException;
        }

        
        
}

因此我们要配置集群就可以在application.properties配置成如下:

spring.quartz.properties.org.quartz.jobStore.class=org.springframework.scheduling.quartz.LocalDataSourceJobStore
spring.quartz.properties.org.quartz.jobStore.isClustered = true
spring.quartz.properties.org.quartz.jobStore.clusterCheckinInterval = 10000
spring.quartz.properties.org.quartz.scheduler.instanceId = AUTO

再来看prepareScheduler 方法

private Scheduler prepareScheduler(SchedulerFactory schedulerFactory) throws SchedulerException {
        if (this.resourceLoader != null) {
            // Make given ResourceLoader available for SchedulerFactory configuration.
            configTimeResourceLoaderHolder.set(this.resourceLoader);
        }
        if (this.taskExecutor != null) {
            // Make given TaskExecutor available for SchedulerFactory configuration.
            configTimeTaskExecutorHolder.set(this.taskExecutor);
        }
        if (this.dataSource != null) {
            // Make given DataSource available for SchedulerFactory configuration.
            configTimeDataSourceHolder.set(this.dataSource);
        }
        if (this.nonTransactionalDataSource != null) {
            // Make given non-transactional DataSource available for SchedulerFactory configuration.
            configTimeNonTransactionalDataSourceHolder.set(this.nonTransactionalDataSource);
        }

        // Get Scheduler instance from SchedulerFactory.
        try {
            Scheduler scheduler = createScheduler(schedulerFactory, this.schedulerName);
            populateSchedulerContext(scheduler);

            if (!this.jobFactorySet && !(scheduler instanceof RemoteScheduler)) {
                // Use AdaptableJobFactory as default for a local Scheduler, unless when
                // explicitly given a null value through the "jobFactory" bean property.
                this.jobFactory = new AdaptableJobFactory();
            }
            if (this.jobFactory != null) {
                if (this.applicationContext != null && this.jobFactory instanceof ApplicationContextAware) {
                    ((ApplicationContextAware) this.jobFactory).setApplicationContext(this.applicationContext);
                }
                if (this.jobFactory instanceof SchedulerContextAware) {
                    ((SchedulerContextAware) this.jobFactory).setSchedulerContext(scheduler.getContext());
                }
                scheduler.setJobFactory(this.jobFactory);
            }
            return scheduler;
        }

        finally {
            if (this.resourceLoader != null) {
                configTimeResourceLoaderHolder.remove();
            }
            if (this.taskExecutor != null) {
                configTimeTaskExecutorHolder.remove();
            }
            if (this.dataSource != null) {
                configTimeDataSourceHolder.remove();
            }
            if (this.nonTransactionalDataSource != null) {
                configTimeNonTransactionalDataSourceHolder.remove();
            }
        }
    }

这里面最重要的就是createScheduler(schedulerFactory, this.schedulerName); 方法。 从上文可以看出,这个shcedulerFactory的实例是StdSchedulerFactory。

    protected Scheduler createScheduler(SchedulerFactory schedulerFactory, @Nullable String schedulerName)
            throws SchedulerException {

        // Override thread context ClassLoader to work around naive Quartz ClassLoadHelper loading.
        Thread currentThread = Thread.currentThread();
        ClassLoader threadContextClassLoader = currentThread.getContextClassLoader();
        boolean overrideClassLoader = (this.resourceLoader != null &&
                this.resourceLoader.getClassLoader() != threadContextClassLoader);
        if (overrideClassLoader) {
            currentThread.setContextClassLoader(this.resourceLoader.getClassLoader());
        }
        try {
            SchedulerRepository repository = SchedulerRepository.getInstance();
            synchronized (repository) {
                Scheduler existingScheduler = (schedulerName != null ? repository.lookup(schedulerName) : null);
                Scheduler newScheduler = schedulerFactory.getScheduler();
                if (newScheduler == existingScheduler) {
                    throw new IllegalStateException("Active Scheduler of name '" + schedulerName + "' already registered " +
                            "in Quartz SchedulerRepository. Cannot create a new Spring-managed Scheduler of the same name!");
                }
                if (!this.exposeSchedulerInRepository) {
                    // Need to remove it in this case, since Quartz shares the Scheduler instance by default!
                    SchedulerRepository.getInstance().remove(newScheduler.getSchedulerName());
                }
                return newScheduler;
            }
        }
        finally {
            if (overrideClassLoader) {
                // Reset original thread context ClassLoader.
                currentThread.setContextClassLoader(threadContextClassLoader);
            }
        }
    }

去看一下StdSchedulerFactory::getScheduler 方法

    public Scheduler getScheduler() throws SchedulerException {
        if (cfg == null) {
            initialize();
        }

        SchedulerRepository schedRep = SchedulerRepository.getInstance();

        Scheduler sched = schedRep.lookup(getSchedulerName());

        if (sched != null) {
            if (sched.isShutdown()) {
                schedRep.remove(getSchedulerName());
            } else {
                return sched;
            }
        }

        sched = instantiate();

        return sched;
    }

在看instantiate 方法,设置了启动的各种参数,比如threadpool jstore 的class 等等,这个方法超长,就不仔细描述了,但是最关键的是

    qs = new QuartzScheduler(rsrcs, idleWaitTime, dbFailureRetry);

在 QuartzScheduler 构造函数里, 启动了一个任务分配线程QuartzSchedulerThread

    public QuartzScheduler(QuartzSchedulerResources resources, long idleWaitTime, @Deprecated long dbRetryInterval)
        throws SchedulerException {
        this.resources = resources;
        if (resources.getJobStore() instanceof JobListener) {
            addInternalJobListener((JobListener)resources.getJobStore());
        }

        this.schedThread = new QuartzSchedulerThread(this, resources);
        ThreadExecutor schedThreadExecutor = resources.getThreadExecutor();
        schedThreadExecutor.execute(this.schedThread);
        if (idleWaitTime > 0) {
            this.schedThread.setIdleWaitTime(idleWaitTime);
        }

        jobMgr = new ExecutingJobsManager();
        addInternalJobListener(jobMgr);
        errLogger = new ErrorLogger();
        addInternalSchedulerListener(errLogger);

        signaler = new SchedulerSignalerImpl(this, this.schedThread);
        
        getLog().info("Quartz Scheduler v." + getVersion() + " created.");
    }

QuartzSchedulerThread ::run 方法很长, 但是我还是想把他全部列出来。

@Override
    public void run() {
        int acquiresFailed = 0;

        while (!halted.get()) {
            try {
                // check if we're supposed to pause...
                synchronized (sigLock) {
                    while (paused && !halted.get()) {
                        try {
                            // wait until togglePause(false) is called...
                            sigLock.wait(1000L);
                        } catch (InterruptedException ignore) {
                        }

                        // reset failure counter when paused, so that we don't
                        // wait again after unpausing
                        acquiresFailed = 0;
                    }

                    if (halted.get()) {
                        break;
                    }
                }

                // wait a bit, if reading from job store is consistently
                // failing (e.g. DB is down or restarting)..
                if (acquiresFailed > 1) {
                    try {
                        long delay = computeDelayForRepeatedErrors(qsRsrcs.getJobStore(), acquiresFailed);
                        Thread.sleep(delay);
                    } catch (Exception ignore) {
                    }
                }

                int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
                if(availThreadCount > 0) { // will always be true, due to semantics of blockForAvailableThreads...

                    List<OperableTrigger> triggers;

                    long now = System.currentTimeMillis();

                    clearSignaledSchedulingChange();
                    try {
                        triggers = qsRsrcs.getJobStore().acquireNextTriggers(
                                now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
                        acquiresFailed = 0;
                        if (log.isDebugEnabled())
                            log.debug("batch acquisition of " + (triggers == null ? 0 : triggers.size()) + " triggers");
                    } catch (JobPersistenceException jpe) {
                        if (acquiresFailed == 0) {
                            qs.notifySchedulerListenersError(
                                "An error occurred while scanning for the next triggers to fire.",
                                jpe);
                        }
                        if (acquiresFailed < Integer.MAX_VALUE)
                            acquiresFailed++;
                        continue;
                    } catch (RuntimeException e) {
                        if (acquiresFailed == 0) {
                            getLog().error("quartzSchedulerThreadLoop: RuntimeException "
                                    +e.getMessage(), e);
                        }
                        if (acquiresFailed < Integer.MAX_VALUE)
                            acquiresFailed++;
                        continue;
                    }

                    if (triggers != null && !triggers.isEmpty()) {

                        now = System.currentTimeMillis();
                        long triggerTime = triggers.get(0).getNextFireTime().getTime();
                        long timeUntilTrigger = triggerTime - now;
                        while(timeUntilTrigger > 2) {
                            synchronized (sigLock) {
                                if (halted.get()) {
                                    break;
                                }
                                if (!isCandidateNewTimeEarlierWithinReason(triggerTime, false)) {
                                    try {
                                        // we could have blocked a long while
                                        // on 'synchronize', so we must recompute
                                        now = System.currentTimeMillis();
                                        timeUntilTrigger = triggerTime - now;
                                        if(timeUntilTrigger >= 1)
                                            sigLock.wait(timeUntilTrigger);
                                    } catch (InterruptedException ignore) {
                                    }
                                }
                            }
                            if(releaseIfScheduleChangedSignificantly(triggers, triggerTime)) {
                                break;
                            }
                            now = System.currentTimeMillis();
                            timeUntilTrigger = triggerTime - now;
                        }

                        // this happens if releaseIfScheduleChangedSignificantly decided to release triggers
                        if(triggers.isEmpty())
                            continue;

                        // set triggers to 'executing'
                        List<TriggerFiredResult> bndles = new ArrayList<TriggerFiredResult>();

                        boolean goAhead = true;
                        synchronized(sigLock) {
                            goAhead = !halted.get();
                        }
                        if(goAhead) {
                            try {
                                List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);
                                if(res != null)
                                    bndles = res;
                            } catch (SchedulerException se) {
                                qs.notifySchedulerListenersError(
                                        "An error occurred while firing triggers '"
                                                + triggers + "'", se);
                                //QTZ-179 : a problem occurred interacting with the triggers from the db
                                //we release them and loop again
                                for (int i = 0; i < triggers.size(); i++) {
                                    qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
                                }
                                continue;
                            }

                        }

                        for (int i = 0; i < bndles.size(); i++) {
                            TriggerFiredResult result =  bndles.get(i);
                            TriggerFiredBundle bndle =  result.getTriggerFiredBundle();
                            Exception exception = result.getException();

                            if (exception instanceof RuntimeException) {
                                getLog().error("RuntimeException while firing trigger " + triggers.get(i), exception);
                                qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
                                continue;
                            }

                            // it's possible to get 'null' if the triggers was paused,
                            // blocked, or other similar occurrences that prevent it being
                            // fired at this time...  or if the scheduler was shutdown (halted)
                            if (bndle == null) {
                                qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
                                continue;
                            }

                            JobRunShell shell = null;
                            try {
                                shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
                                shell.initialize(qs);
                            } catch (SchedulerException se) {
                                qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
                                continue;
                            }

                            if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
                                // this case should never happen, as it is indicative of the
                                // scheduler being shutdown or a bug in the thread pool or
                                // a thread pool being used concurrently - which the docs
                                // say not to do...
                                getLog().error("ThreadPool.runInThread() return false!");
                                qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
                            }

                        }

                        continue; // while (!halted)
                    }
                } else { // if(availThreadCount > 0)
                    // should never happen, if threadPool.blockForAvailableThreads() follows contract
                    continue; // while (!halted)
                }

                long now = System.currentTimeMillis();
                long waitTime = now + getRandomizedIdleWaitTime();
                long timeUntilContinue = waitTime - now;
                synchronized(sigLock) {
                    try {
                      if(!halted.get()) {
                        // QTZ-336 A job might have been completed in the mean time and we might have
                        // missed the scheduled changed signal by not waiting for the notify() yet
                        // Check that before waiting for too long in case this very job needs to be
                        // scheduled very soon
                        if (!isScheduleChanged()) {
                          sigLock.wait(timeUntilContinue);
                        }
                      }
                    } catch (InterruptedException ignore) {
                    }
                }

            } catch(RuntimeException re) {
                getLog().error("Runtime error occurred in main trigger firing loop.", re);
            }
        } // while (!halted)

        // drop references to scheduler stuff to aid garbage collection...
        qs = null;
        qsRsrcs = null;
    }

找到可以运行的 job trigger

这个run 方法里,做几点事情, 第一是找到可以运行的 job trigger

                    try {
                        triggers = qsRsrcs.getJobStore().acquireNextTriggers(
                                now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
                        acquiresFailed = 0;
                        if (log.isDebugEnabled())
                            log.debug("batch acquisition of " + (triggers == null ? 0 : triggers.size()) + " triggers");
                    } catch (JobPersistenceException jpe) {
                        if (acquiresFailed == 0) {
                            qs.notifySchedulerListenersError(
                                "An error occurred while scanning for the next triggers to fire.",
                                jpe);
                        }

为啥quartz 一个job 只可以在一个机器上运行啊 qsRsrcs.getJobStore().acquireNextTriggers主要就是这个方法

    public List<OperableTrigger> acquireNextTriggers(final long noLaterThan, final int maxCount, final long timeWindow)
        throws JobPersistenceException {
        
        String lockName;
        if(isAcquireTriggersWithinLock() || maxCount > 1) { 
            lockName = LOCK_TRIGGER_ACCESS;
        } else {
            lockName = null;
        }
        return executeInNonManagedTXLock(lockName, 
                new TransactionCallback<List<OperableTrigger>>() {
                    public List<OperableTrigger> execute(Connection conn) throws JobPersistenceException {
                        return acquireNextTrigger(conn, noLaterThan, maxCount, timeWindow);
                    }
                },
                new TransactionValidator<List<OperableTrigger>>() {
                    public Boolean validate(Connection conn, List<OperableTrigger> result) throws JobPersistenceException {
                        try {
                            List<FiredTriggerRecord> acquired = getDelegate().selectInstancesFiredTriggerRecords(conn, getInstanceId());
                            Set<String> fireInstanceIds = new HashSet<String>();
                            for (FiredTriggerRecord ft : acquired) {
                                fireInstanceIds.add(ft.getFireInstanceId());
                            }
                            for (OperableTrigger tr : result) {
                                if (fireInstanceIds.contains(tr.getFireInstanceId())) {
                                    return true;
                                }
                            }
                            return false;
                        } catch (SQLException e) {
                            throw new JobPersistenceException("error validating trigger acquisition", e);
                        }
                    }
                });
    }

在这锁的包装下,进行了真正的acquireNextTrigger

        return executeInNonManagedTXLock(lockName, 
                new TransactionCallback<List<OperableTrigger>>() {
                    public List<OperableTrigger> execute(Connection conn) throws JobPersistenceException {
                        return acquireNextTrigger(conn, noLaterThan, maxCount, timeWindow);
                    }
                },

来看真正的数据库逻辑

   // FUTURE_TODO: this really ought to return something like a FiredTriggerBundle,
   // so that the fireInstanceId doesn't have to be on the trigger...
   protected List<OperableTrigger> acquireNextTrigger(Connection conn, long noLaterThan, int maxCount, long timeWindow)
       throws JobPersistenceException {
       if (timeWindow < 0) {
         throw new IllegalArgumentException();
       }
       
       List<OperableTrigger> acquiredTriggers = new ArrayList<OperableTrigger>();
       Set<JobKey> acquiredJobKeysForNoConcurrentExec = new HashSet<JobKey>();
       final int MAX_DO_LOOP_RETRY = 3;
       int currentLoopCount = 0;
       do {
           currentLoopCount ++;
           try {
               List<TriggerKey> keys = getDelegate().selectTriggerToAcquire(conn, noLaterThan + timeWindow, getMisfireTime(), maxCount);
               
               // No trigger is ready to fire yet.
               if (keys == null || keys.size() == 0)
                   return acquiredTriggers;

               long batchEnd = noLaterThan;

               for(TriggerKey triggerKey: keys) {
                   // If our trigger is no longer available, try a new one.
                   OperableTrigger nextTrigger = retrieveTrigger(conn, triggerKey);
                   if(nextTrigger == null) {
                       continue; // next trigger
                   }
                   
                   // If trigger's job is set as @DisallowConcurrentExecution, and it has already been added to result, then
                   // put it back into the timeTriggers set and continue to search for next trigger.
                   JobKey jobKey = nextTrigger.getJobKey();
                   JobDetail job;
                   try {
                       job = retrieveJob(conn, jobKey);
                   } catch (JobPersistenceException jpe) {
                       try {
                           getLog().error("Error retrieving job, setting trigger state to ERROR.", jpe);
                           getDelegate().updateTriggerState(conn, triggerKey, STATE_ERROR);
                       } catch (SQLException sqle) {
                           getLog().error("Unable to set trigger state to ERROR.", sqle);
                       }
                       continue;
                   }
                   
                   if (job.isConcurrentExectionDisallowed()) {
                       if (acquiredJobKeysForNoConcurrentExec.contains(jobKey)) {
                           continue; // next trigger
                       } else {
                           acquiredJobKeysForNoConcurrentExec.add(jobKey);
                       }
                   }

                   Date nextFireTime = nextTrigger.getNextFireTime();

                   // A trigger should not return NULL on nextFireTime when fetched from DB.
                   // But for whatever reason if we do have this (BAD trigger implementation or
                   // data?), we then should log a warning and continue to next trigger.
                   // User would need to manually fix these triggers from DB as they will not
                   // able to be clean up by Quartz since we are not returning it to be processed.
                   if (nextFireTime == null) {
                       log.warn("Trigger {} returned null on nextFireTime and yet still exists in DB!",
                           nextTrigger.getKey());
                       continue;
                   }
                   
                   if (nextFireTime.getTime() > batchEnd) {
                     break;
                   }
                   // We now have a acquired trigger, let's add to return list.
                   // If our trigger was no longer in the expected state, try a new one.
                   int rowsUpdated = getDelegate().updateTriggerStateFromOtherState(conn, triggerKey, STATE_ACQUIRED, STATE_WAITING);
                   if (rowsUpdated <= 0) {
                       continue; // next trigger
                   }
                   nextTrigger.setFireInstanceId(getFiredTriggerRecordId());
                   getDelegate().insertFiredTrigger(conn, nextTrigger, STATE_ACQUIRED, null);

                   if(acquiredTriggers.isEmpty()) {
                       batchEnd = Math.max(nextFireTime.getTime(), System.currentTimeMillis()) + timeWindow;
                   }
                   acquiredTriggers.add(nextTrigger);
               }

               // if we didn't end up with any trigger to fire from that first
               // batch, try again for another batch. We allow with a max retry count.
               if(acquiredTriggers.size() == 0 && currentLoopCount < MAX_DO_LOOP_RETRY) {
                   continue;
               }
               
               // We are done with the while loop.
               break;
           } catch (Exception e) {
               throw new JobPersistenceException(
                         "Couldn't acquire next trigger: " + e.getMessage(), e);
           }
       } while (true);
       
       // Return the acquired trigger list
       return acquiredTriggers;
   }

主要看两个方法

  • 选择一个trigger
    List<TriggerKey> keys = getDelegate().selectTriggerToAcquire(conn, noLaterThan + timeWindow, getMisfireTime(), maxCount);

  • 标注trigger 被占用,不用被其他的instance选择了

                    // We now have a acquired trigger, let's add to return list.
                    // If our trigger was no longer in the expected state, try a new one.
                    int rowsUpdated = getDelegate().updateTriggerStateFromOtherState(conn, triggerKey, STATE_ACQUIRED, STATE_WAITING);
                    if (rowsUpdated <= 0) {
                        continue; // next trigger
                    }
                    nextTrigger.setFireInstanceId(getFiredTriggerRecordId());
                    getDelegate().insertFiredTrigger(conn, nextTrigger, STATE_ACQUIRED, null);

运行job

回到前文QuartzSchedulerThread的run 方法

                      for (int i = 0; i < bndles.size(); i++) {
                            TriggerFiredResult result =  bndles.get(i);
                            TriggerFiredBundle bndle =  result.getTriggerFiredBundle();
                            Exception exception = result.getException();

                            if (exception instanceof RuntimeException) {
                                getLog().error("RuntimeException while firing trigger " + triggers.get(i), exception);
                                qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
                                continue;
                            }

                            // it's possible to get 'null' if the triggers was paused,
                            // blocked, or other similar occurrences that prevent it being
                            // fired at this time...  or if the scheduler was shutdown (halted)
                            if (bndle == null) {
                                qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
                                continue;
                            }

                            JobRunShell shell = null;
                            try {
                                shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
                                shell.initialize(qs);
                            } catch (SchedulerException se) {
                                qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
                                continue;
                            }

                            if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
                                // this case should never happen, as it is indicative of the
                                // scheduler being shutdown or a bug in the thread pool or
                                // a thread pool being used concurrently - which the docs
                                // say not to do...
                                getLog().error("ThreadPool.runInThread() return false!");
                                qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
                            }

                        }

                        continue; // while (!halted)
                    }

主要是看下面这段, 去真正运行一个job

                            JobRunShell shell = null;
                            try {
                                shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
                                shell.initialize(qs);
                            } catch (SchedulerException se) {
                                qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
                                continue;
                            }

                            if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
                                // this case should never happen, as it is indicative of the
                                // scheduler being shutdown or a bug in the thread pool or
                                // a thread pool being used concurrently - which the docs
                                // say not to do...
                                getLog().error("ThreadPool.runInThread() return false!");
                                qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
                            }

咱们来看 shell.initialize(qs);

    public void initialize(QuartzScheduler sched)
        throws SchedulerException {
        this.qs = sched;

        Job job = null;
        JobDetail jobDetail = firedTriggerBundle.getJobDetail();

        try {
            job = sched.getJobFactory().newJob(firedTriggerBundle, scheduler);
        } catch (SchedulerException se) {
            sched.notifySchedulerListenersError(
                    "An error occured instantiating job to be executed. job= '"
                            + jobDetail.getKey() + "'", se);
            throw se;
        } catch (Throwable ncdfe) { // such as NoClassDefFoundError
            SchedulerException se = new SchedulerException(
                    "Problem instantiating class '"
                            + jobDetail.getJobClass().getName() + "' - ", ncdfe);
            sched.notifySchedulerListenersError(
                    "An error occured instantiating job to be executed. job= '"
                            + jobDetail.getKey() + "'", se);
            throw se;
        }

        this.jec = new JobExecutionContextImpl(scheduler, firedTriggerBundle, job);
    }

此时sched.getJobFactory()来生成一个job ,这个jobFactory 是哪里set的? 在回到最开始的源头QuartzAutoConfiguration::quartzScheduler

        SpringBeanJobFactory jobFactory = new SpringBeanJobFactory();
        jobFactory.setApplicationContext(applicationContext);
        schedulerFactoryBean.setJobFactory(jobFactory);

可以看到jobFactory 是SpringBeanJobFactory::createJobInstance

    protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {
        Object job = (this.applicationContext != null ?
                this.applicationContext.getAutowireCapableBeanFactory().createBean(
                        bundle.getJobDetail().getJobClass(), AutowireCapableBeanFactory.AUTOWIRE_CONSTRUCTOR, false) :
                super.createJobInstance(bundle));

        if (isEligibleForPropertyPopulation(job)) {
            BeanWrapper bw = PropertyAccessorFactory.forBeanPropertyAccess(job);
            MutablePropertyValues pvs = new MutablePropertyValues();
            if (this.schedulerContext != null) {
                pvs.addPropertyValues(this.schedulerContext);
            }
            pvs.addPropertyValues(bundle.getJobDetail().getJobDataMap());
            pvs.addPropertyValues(bundle.getTrigger().getJobDataMap());
            if (this.ignoredUnknownProperties != null) {
                for (String propName : this.ignoredUnknownProperties) {
                    if (pvs.contains(propName) && !bw.isWritableProperty(propName)) {
                        pvs.removePropertyValue(propName);
                    }
                }
                bw.setPropertyValues(pvs);
            }
            else {
                bw.setPropertyValues(pvs, true);
            }
        }

        return job;
    }

可以看到它在第一行就把传入的TriggerFiredBundle 的job class 变成了一个spring bean,到这里才恍然大悟。 这也就是为什么在本文开头一个QuartzJobBean 没加任何annotation,为什么还可以autowired对象。

qsRsrcs.getThreadPool().runInThread(shell) 时, 就是真正的运行job 了。 后续还有一些逻辑, 比如release trigger 之类的就不仔细描述了。 本文只是走一些quatrz 主要是如何spring boot 集成的。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,185评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,445评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,684评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,564评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,681评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,874评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,025评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,761评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,217评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,545评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,694评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,351评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,988评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,778评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,007评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,427评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,580评论 2 349

推荐阅读更多精彩内容