在大多数的情况下,我们都希望任务能按照我们预期的时间进行执行。我最常接触的是spring自带的@Scheduled注解或是使用XXL-JOB完成对采数逻辑的调度。@Scheduled注解无法直接完成分布式的任务调度,需要配合关系型数据库的行级别锁互斥,这无疑增大了设计的复杂度。使用XXL-JOB则无疑增大了data-pipeline对第三方应用的依赖。基于以上的考虑,我选择了直接集成quartz,quartz提供了基于数据库行级别锁互斥的分布式调度方案,下面罗列集成的步骤:
- pom添加依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-quartz</artifactId>
</dependency>
- application-*.yml添加如下配置(可根据注释按照实际情况调整):
quartz:
jobStore:
# 数据保存方式为数据库持久化
class: org.quartz.impl.jdbcjobstore.JobStoreTX
# JobDataMaps是否都为String类型,默认false
useProperties: false
# 表的前缀,默认QRTZ_
tablePrefix: QRTZ_
# 是否加入集群
isClustered: true
# 调度实例失效的检查时间间隔 ms
clusterCheckinInterval: 5000
# 数据库代理类
driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate
# 当设置为“true”时,此属性告诉Quartz 在非托管JDBC连接上调用setTransactionIsolation
txIsolationLevelReadCommitted: true
scheduler:
# 调度标识名 集群中每一个实例都必须使用相同的名称
instanceName: ClusterQuartz
# ID设置为自动获取 每一个必须不同
instanceId: AUTO
threadPool:
# 线程池的实现类(一般使用SimpleThreadPool即可满足几乎所有用户的需求)
class: org.quartz.simpl.SimpleThreadPool
# 指定线程数,一般设置为1-100直接的整数,根据系统资源配置
threadCount: 5
# 设置线程的优先级(可以是Thread.MIN_PRIORITY(即1)和Thread.MAX_PRIORITY(这是10)之间的任何int
threadPriority: 5
- 创建类SpringJobFactory.java,解决job中service注入为空的问题
package cn.juque.datapipeline.quartz;
import org.quartz.spi.TriggerFiredBundle;
import org.springframework.beans.factory.config.AutowireCapableBeanFactory;
import org.springframework.scheduling.quartz.AdaptableJobFactory;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* @author juque
* @version 1.0.0
* <ul>
* <li>SpringJobFactory</li>
* <li>解决job中service注入为空的问题。</li>
* </ul>
* @date 2023-04-08 21:58:32
**/
@Component
public class SpringJobFactory extends AdaptableJobFactory {
@Resource
private AutowireCapableBeanFactory autowireCapableBeanFactory;
@Override
protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {
Object jobInstance = super.createJobInstance(bundle);
// 进行注入
this.autowireCapableBeanFactory.autowireBean(jobInstance);
return jobInstance;
}
}
- 创建配置类QuartzConfig.java
package cn.juque.datapipeline.config;
import cn.juque.datapipeline.quartz.SpringJobFactory;
import org.quartz.Scheduler;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;
import javax.annotation.Resource;
import javax.sql.DataSource;
import java.util.Properties;
/**
* @author juque
* @version 1.0.0
* <ul>
* <li>QuartzConfig</li>
* </ul>
* @date 2023-04-07 15:05:33
**/
@Configuration
public class QuartzConfig {
@Value("${quartz.jobStore.useProperties}")
private Boolean jobStoreUseProperties;
@Value("${quartz.jobStore.tablePrefix}")
private String jobStoreTablePrefix;
@Value("${quartz.jobStore.isClustered}")
private Boolean jobStoreIsClustered;
@Value("${quartz.jobStore.clusterCheckinInterval}")
private String jobStoreClusterCheckinInterval;
@Value("${quartz.jobStore.txIsolationLevelReadCommitted}")
private Boolean jobStoreTxIsolationLevelReadCommitted;
@Value("${quartz.jobStore.class}")
private String jobStoreClass;
@Value("${quartz.jobStore.driverDelegateClass}")
private String jobStoreDriverDelegateClass;
@Value("${quartz.scheduler.instanceName}")
private String schedulerInstanceName;
@Value("${quartz.scheduler.instanceId}")
private String schedulerInstanceId;
@Value("${quartz.threadPool.class}")
private String threadPoolClass;
@Value("${quartz.threadPool.threadCount}")
private String threadPoolThreadCount;
@Value("${quartz.threadPool.threadPriority}")
private String threadPoolThreadPriority;
@Resource
private SpringJobFactory springJobFactory;
@Bean
public SchedulerFactoryBean schedulerFactoryBean(DataSource dataSource) {
SchedulerFactoryBean schedulerFactoryBean = new SchedulerFactoryBean();
schedulerFactoryBean.setDataSource(dataSource);
Properties properties = new Properties();
properties.put("org.quartz.jobStore.useProperties", jobStoreUseProperties);
properties.put("org.quartz.jobStore.tablePrefix", jobStoreTablePrefix);
properties.put("org.quartz.jobStore.isClustered", jobStoreIsClustered);
properties.put("org.quartz.jobStore.clusterCheckinInterval", jobStoreClusterCheckinInterval);
properties.put("org.quartz.jobStore.txIsolationLevelReadCommitted", jobStoreTxIsolationLevelReadCommitted);
properties.put("org.quartz.jobStore.class", jobStoreClass);
properties.put("org.quartz.jobStore.driverDelegateClass", jobStoreDriverDelegateClass);
properties.put("org.quartz.scheduler.instanceName", schedulerInstanceName);
properties.put("org.quartz.scheduler.instanceId", schedulerInstanceId);
properties.put("org.quartz.threadPool.class", threadPoolClass);
properties.put("org.quartz.threadPool.threadCount", threadPoolThreadCount);
properties.put("org.quartz.threadPool.threadPriority", threadPoolThreadPriority);
schedulerFactoryBean.setQuartzProperties(properties);
schedulerFactoryBean.setSchedulerName("dpp-scheduler");
schedulerFactoryBean.setStartupDelay(1);
schedulerFactoryBean.setApplicationContextSchedulerContextKey("applicationContextKey");
// QuartzScheduler 启动时更新己存在的Job,这样就不用每次修改targetObject后删除qrtz_job_details表对应记录
schedulerFactoryBean.setOverwriteExistingJobs(true);
schedulerFactoryBean.setAutoStartup(true);
schedulerFactoryBean.setJobFactory(this.springJobFactory);
return schedulerFactoryBean;
}
@Bean
public Scheduler scheduler(DataSource dataSource) {
return this.schedulerFactoryBean(dataSource).getScheduler();
}
}
- 至此,基础配置已完成。
- 下面参考任务调度的实现,创建TaskGroupJobServiceImpl.java
package cn.juque.datapipeline.service.impl;
import cn.juque.datapipeline.constans.BusinessConstants;
import cn.juque.datapipeline.helper.TaskInfoHelper;
import org.quartz.Job;
import org.quartz.JobDataMap;
import org.quartz.JobExecutionContext;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
/**
* @author juque
* @version 1.0.0
* <ul>
* <li>TaskGroupJobjServiceImpl</li>
* </ul>
* @date 2023-04-07 15:12:39
**/
@Service("taskGroupJobService")
public class TaskGroupJobServiceImpl implements Job {
@Resource
private TaskInfoHelper taskInfoHelper;
@Override
public void execute(JobExecutionContext jobExecutionContext) {
JobDataMap jobDataMap = jobExecutionContext.getJobDetail().getJobDataMap();
String groupId = jobDataMap.getString(BusinessConstants.GROUP_ID_KEY);
taskInfoHelper.startTask(groupId);
}
}
- 添加任务的逻辑,具体参考:QuartzHelper.java
/**
* 添加任务调度
*
* @param groupInfo 任务组信息
*/
public void addJob(TaskGroupInfo groupInfo) {
JobDetail jobDetail = JobBuilder
.newJob(TaskGroupJobServiceImpl.class)
.withDescription(groupInfo.getGroupName())
.withIdentity(QUARTZ_JOB_NAME + groupInfo.getId(), QUARTZ_JOB_GROUP_NAME)
// 传递任务组ID
.usingJobData(BusinessConstants.GROUP_ID_KEY, groupInfo.getId()).build();
Trigger trigger;
if (GroupExecuteTypeEnum.CORN.getCode().equals(groupInfo.getExecuteType())) {
trigger = TriggerBuilder.newTrigger()
.withDescription(groupInfo.getGroupName())
.withIdentity(QUARTZ_TRIGGER_NAME + groupInfo.getId(), QUARTZ_TRIGGER_GROUP_NAME)
.startNow().withSchedule(CronScheduleBuilder.cronSchedule(groupInfo.getCron())).build();
} else {
trigger = TriggerBuilder.newTrigger()
.withDescription(groupInfo.getGroupName())
.withIdentity(QUARTZ_TRIGGER_NAME + groupInfo.getId(), QUARTZ_TRIGGER_GROUP_NAME)
.startAt(DateUtil.offsetSecond(new Date(), groupInfo.getDelaySeconds()))
.withSchedule(SimpleScheduleBuilder.repeatSecondlyForever(groupInfo.getDelaySeconds())).build();
}
try {
this.scheduler.scheduleJob(jobDetail, trigger);
log.info("完成任务组【{}】的调度初始化", groupInfo.getGroupName());
} catch (Exception e) {
log.error("任务组【{}】调度任务初始化失败", groupInfo.getGroupName(), e);
throw new AppException(MessageEnum.SYSTEM_ERROR);
}
}
- 至此完成对quartz的集成,并实现quartz对任务组的分布式调度。下面附加对Job的其他操作方法:
/**
* 删除任务
*
* @param groupId 任务组ID
*/
public void deleteJob(String groupId) {
JobKey jobKey = JobKey.jobKey(QUARTZ_JOB_NAME + groupId, QUARTZ_JOB_GROUP_NAME);
try {
this.scheduler.deleteJob(jobKey);
} catch (Exception e) {
log.error("任务ID【{}】调度任务删除失败", groupId, e);
throw new AppException(MessageEnum.SYSTEM_ERROR);
}
}
/**
* job是否存在
*
* @param groupId 任务组ID
* @return boolean
*/
public Boolean existsJob(String groupId) {
JobKey jobKey = JobKey.jobKey(QUARTZ_JOB_NAME + groupId, QUARTZ_JOB_GROUP_NAME);
try {
return this.scheduler.checkExists(jobKey);
} catch (Exception e) {
log.error("任务ID【{}】调度任务删除失败", groupId, e);
throw new AppException(MessageEnum.SYSTEM_ERROR);
}
}
/**
* 调度一次
*
* @param groupId 任务组ID
*/
public void runOnce(String groupId) {
JobKey jobKey = JobKey.jobKey(QUARTZ_JOB_NAME + groupId, QUARTZ_JOB_GROUP_NAME);
try {
this.scheduler.triggerJob(jobKey);
} catch (Exception e) {
log.error(e.getMessage(), e);
throw new AppException(MessageEnum.SYSTEM_ERROR);
}
}
/**
* 暂停任务
*
* @param groupId 任务组id
*/
public void pauseJob(String groupId) {
JobKey jobKey = JobKey.jobKey(QUARTZ_JOB_NAME + groupId, QUARTZ_JOB_GROUP_NAME);
try {
this.scheduler.pauseJob(jobKey);
} catch (Exception e) {
log.error(e.getMessage(), e);
throw new AppException(MessageEnum.SYSTEM_ERROR);
}
}
/**
* 恢复任务
*
* @param groupId 任务组ID
*/
public void resumeJob(String groupId) {
JobKey jobKey = JobKey.jobKey(QUARTZ_JOB_NAME + groupId, QUARTZ_JOB_GROUP_NAME);
try {
this.scheduler.resumeJob(jobKey);
} catch (Exception e) {
log.error(e.getMessage(), e);
throw new AppException(MessageEnum.SYSTEM_ERROR);
}
}