springboot集成quartz实现动态任务调度

quartz是一个开源的作业调度框架,本文就是介绍下springboot框架下继承quartz的一些使用示例

首先我们需要添加quartz的spring-boot-starter-quartz依赖

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-quartz</artifactId>
        </dependency>

我们需要做一些配置。quartz提供了基于内存(MEMORY)和基于jdbc的两种任务存储方式。具体的配置可参考下面的示例

spring:
  quartz:
    jdbc:
      initialize-schema: never
    job-store-type: MEMORY
    properties:
      org:
        quartz:
          scheduler:
            instanceName: myQuartzCluster
            instanceId: AUTO
          jobstore:
            class: org.quartz.impl.jdbcjobstore.JobStoreTX
            driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate
            tablePrefix: qrtz_
            isClustered: true
            clusterCheckinInterval: 10000
            useProperties: false
          threadPool:
            class: org.quartz.simpl.SimpleThreadPool
            threadCount: 5
            threadPriority: 5
            threadsInheritContextClassLoaderOfInitializingThread: true

需要注意的是,如果是集群部署而且定时任务同时只能一个实例运行的话,需要配置成jdbc方式,这样就可以用到数据库的锁。MEMORY这种方式建议用于开发环境测试,不建议用于生产环境。

如果是使用jdbc的方式,quartz提供了不同数据库的建表语句。可以在quartz的jar包中找到,具体路径是org.quartz.impl.jdbcjobstore

quartz_ddl.png

定时任务的类需要继承自QuartzJobBean并重写 executeInternal方法。

package com.xchaset.quartz.schedule.task;

import com.xchaset.quartz.annotation.MyScheduled;
import lombok.extern.slf4j.Slf4j;
import org.quartz.DisallowConcurrentExecution;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.springframework.scheduling.quartz.QuartzJobBean;
import org.springframework.stereotype.Service;

@Service
@Slf4j
@DisallowConcurrentExecution
@MyScheduled(name = "EmailTask",description = "email schedule task",cronExpression = "0 0/2 * * * ?")
public class EmailTask extends QuartzJobBean {

    @Override
    protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException {
        log.info("{} is execute!",this.getClass().getSimpleName());
    }
}

@DisallowConcurrentExecution注解可以控制我们的相同类的定时任务同时只有一个执行。通常的场景是如果定时任务实际执行时间大于设定的定时任务执行间隔时间,就会导致同时多个任务并行。

@MyScheduled是自定义的一个注解,主要用于应用启动时将定时任务注册。

package com.xchaset.quartz.annotation;

import org.springframework.stereotype.Service;

import java.lang.annotation.*;

@Documented
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Service
public @interface MyScheduled {

    String name() default "";

    String description() default "";

    String cronExpression() default "";

    boolean enable() default true;

}

获取注解类上的信息

package com.xchaset.quartz.utils;

import com.xchaset.quartz.annotation.MyScheduled;
import com.xchaset.quartz.schedule.JobBean;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.scheduling.quartz.QuartzJobBean;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;


public class MyAnnotationUtil {

    public static List<JobBean> getAllMyScheduledTask(){
        List<JobBean> jobBeans = new ArrayList<>();
        Map<String, QuartzJobBean> beansOfType = SpringContextHolder.getBeansOfType(QuartzJobBean.class);
        for (Map.Entry<String, QuartzJobBean> beanEntry : beansOfType.entrySet()) {
            MyScheduled annotation = AnnotationUtils.findAnnotation(beanEntry.getValue().getClass(), MyScheduled.class);
            QuartzJobBean value = beanEntry.getValue();
            if (annotation == null) {
                continue;
            }
            if (annotation.enable()) {
                String name = annotation.name();
                String description = annotation.description();
                String cronExpression = annotation.cronExpression();
                JobBean build = JobBean.builder().jobName(name)
                        .groupName("myGroup")
                        .cronExpression(cronExpression)
                        .description(description)
                        .quartzJobBean(value.getClass())
                        .build();
                jobBeans.add(build);
            }
        }
        return jobBeans;
    }
}

启动时注册任务

package com.xchaset.quartz.schedule;

import com.xchaset.quartz.utils.MyAnnotationUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

import java.util.List;

@Component
public class ScheduleCommandLine implements CommandLineRunner {

    @Autowired
    private QuartzService quartzService;

    @Override
    public void run(String... args) throws Exception {
        List<JobBean> allMyScheduledTask = MyAnnotationUtil.getAllMyScheduledTask();
        for (JobBean jobBean : allMyScheduledTask) {
            quartzService.deleteJob(jobBean.getGroupName(),jobBean.getJobName());
            quartzService.addJob(jobBean.getQuartzJobBean(),jobBean.getGroupName(),jobBean.getJobName(),jobBean.getCronExpression(),null);
        }
    }
}

对于怎么做到动态的添加、暂停、删除、恢复、执行一个任务呢? quartz提供了这样的API。我们只需要稍微做一下封装。以接口的形式提供出去就可以了。

package com.xchaset.quartz.schedule;

import org.quartz.*;
import org.quartz.impl.matchers.GroupMatcher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.quartz.QuartzJobBean;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.util.*;

@Service
public class QuartzService {

    @Autowired
    private Scheduler scheduler;

    @PostConstruct
    public void startScheduler() {
        try {
            scheduler.start();
        } catch (SchedulerException e) {
            e.printStackTrace();
        }
    }

    /**
     * 添加一个任务
     * @param jobClass
     * @param jobGroup
     * @param jobName
     * @param jobTime
     * @param jobMap
     * @throws SchedulerException
     */
    public void addJob(Class<? extends QuartzJobBean> jobClass, String jobGroup, String jobName, String jobTime, Map jobMap) throws SchedulerException {
        JobDetail build = JobBuilder.newJob(jobClass).withIdentity(jobName, jobGroup).build();
        if (jobMap != null && jobMap.size() > 0) {
            build.getJobDataMap().putAll(jobMap);
        }
        Trigger cronTrigger = TriggerBuilder.newTrigger().withIdentity(jobName, jobGroup)
                .startAt(DateBuilder.futureDate(1, DateBuilder.IntervalUnit.SECOND))
                .withSchedule(CronScheduleBuilder.cronSchedule(jobTime)).startNow().build();

        scheduler.scheduleJob(build, cronTrigger);

    }

    /**
     * 更新任务时间表达式
     * @param jobGroup
     * @param jobName
     * @param jobTime
     * @throws SchedulerException
     */
    public void updateJob(String jobGroup, String jobName, String jobTime) throws SchedulerException {
        TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroup);
        CronTrigger trigger = (CronTrigger)scheduler.getTrigger(triggerKey);
        trigger = trigger.getTriggerBuilder().withIdentity(triggerKey)
                .withSchedule(CronScheduleBuilder.cronSchedule(jobTime)).build();
        scheduler.rescheduleJob(triggerKey,trigger);
    }

    /**
     * 删除任务
     * @param jobGroup
     * @param jobName
     * @throws SchedulerException
     */
    public void deleteJob(String jobGroup,String jobName) throws SchedulerException {
        scheduler.deleteJob(new JobKey(jobName,jobGroup));
    }

    public void pauseJob(String jobGroup,String jobName) throws SchedulerException {
        scheduler.pauseJob(new JobKey(jobName,jobGroup));
    }

    /**
     * 恢复任务
     * @param jobGroup
     * @param jobName
     * @throws SchedulerException
     */
    public void resumeJob(String jobGroup,String jobName) throws SchedulerException {
        scheduler.resumeJob(new JobKey(jobName,jobGroup));
    }

    /**
     * 立刻执行一个任务
     * @param jobGroup
     * @param jobName
     * @throws SchedulerException
     */
    public void runAJobNow(String jobGroup,String jobName) throws SchedulerException {
        scheduler.triggerJob(new JobKey(jobName,jobGroup));
    }

    /**
     * 查询所有的任务
     * @return
     * @throws SchedulerException
     */
    public List<JobDetailsBean> queryAllJob() throws SchedulerException {
        GroupMatcher<JobKey> anyJobGroup = GroupMatcher.anyJobGroup();
        Set<JobKey> jobKeys = scheduler.getJobKeys(anyJobGroup);
        List<JobDetailsBean> result = getJobDetailsBeans(jobKeys);
        return result;
    }

    private List<JobDetailsBean> getJobDetailsBeans(Collection<JobKey> jobKeys) throws SchedulerException {
        List<JobDetailsBean> result = new ArrayList<>();
        for (JobKey jobKey : jobKeys) {
            List<? extends Trigger> triggersOfJob = scheduler.getTriggersOfJob(jobKey);
            for (Trigger trigger : triggersOfJob) {
                String group = jobKey.getGroup();
                String name = jobKey.getName();
                String className = jobKey.getClass().getName();
                String description = trigger.getDescription();
                JobDataMap jobDataMap = trigger.getJobDataMap();
                Date nextFireTime = trigger.getNextFireTime();
                Date previousFireTime = trigger.getPreviousFireTime();
                Trigger.TriggerState triggerState = scheduler.getTriggerState(TriggerKey.triggerKey(name, group));
                String cronExpression = "";
                if (trigger instanceof CronTrigger) {
                    cronExpression = ((CronTrigger) trigger).getCronExpression();
                }

                JobDetailsBean build = JobDetailsBean.builder().className(className).groupName(group).jobName(name).description(description)
                        .cronExpression(cronExpression).jobDataMap(jobDataMap).nextFireTime(nextFireTime).previousFireTime(previousFireTime)
                        .triggerState(triggerState.name()).build();
                result.add(build);
            }
        }
        return result;
    }

    /**
     * 查询正在执行的任务
     * @return
     * @throws SchedulerException
     */
    public List<JobDetailsBean> queryCurrentExecutingJob() throws SchedulerException {
        List<JobDetailsBean> result = new ArrayList<>();
        List<JobExecutionContext> currentlyExecutingJobs = scheduler.getCurrentlyExecutingJobs();
        for (JobExecutionContext currentlyExecutingJob : currentlyExecutingJobs) {
            Trigger trigger = currentlyExecutingJob.getTrigger();
            JobDataMap jobDataMap = trigger.getJobDataMap();
            JobKey jobKey = trigger.getJobKey();
            String group = jobKey.getGroup();
            String name = jobKey.getName();
            String className = jobKey.getClass().getName();
            Date nextFireTime = trigger.getNextFireTime();
            String description = trigger.getDescription();
            Date previousFireTime = trigger.getPreviousFireTime();
            Trigger.TriggerState triggerState = scheduler.getTriggerState(trigger.getKey());
            String cronExpression = "";
            if (trigger instanceof CronTrigger) {
                cronExpression = ((CronTrigger) trigger).getCronExpression();
            }
            JobDetailsBean build = JobDetailsBean.builder().className(className).groupName(group).jobName(name).description(description)
                    .cronExpression(cronExpression).jobDataMap(jobDataMap).nextFireTime(nextFireTime).previousFireTime(previousFireTime)
                    .triggerState(triggerState.name()).build();
            result.add(build);
        }
        return result;
    }
}

我们可以在上面代码的基础上,对外提供接口,这样就可以通过可视化的界面来动态的对任务进行调度了。

以上就是springboot集成quartz的简单使用了。

源码可以参考github仓库: https://github.com/xchaset/example

©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

友情链接更多精彩内容