定时任务原理以及自定义定时任务的管理

概述

由于一些定时任务调度,可能会存在调整的问题。
本文主要是讲述一下定时任务的实现原理。

定时任务的用法

@Component
public class TestTask {

    @Scheduled(cron = "0/1 * * * * ?")
    public void cron() {
        System.out.println("nothing to do ");
    }
    
    @Scheduled(fixedDelay = 1000)
    public void fixedDelay() {
        System.out.println("nothing to do ");
    }
    
    @Scheduled(fixedRate = 1000)
    public void fixedRate() {
        System.out.println("nothing to do ");
    }
}
主要分为三种。cron以及fixedDelay,fixRate。
这三者的区别吧。
cron其实是定时执行-到某个点马上执行,当然这里是线程资源足够的前提。
fixedDelay,固定延迟
fixRate,固定频率延迟,如果没有执行完,下次任务会马上执行。网上说会有并发问题。我阅读源码没有发现这个问题。因为所有下次的执行都是依赖上一次执行完毕的。

spring如何实现定时任务原理

通过导入配置类

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Import(SchedulingConfiguration.class)
@Documented
public @interface EnableScheduling {

}

配置类做了什么?

其实配置类比较简单。通过注入了一个内置的后置处理器。
这个后置处理器主要是在bean初始化时,做了一些操作。
@Configuration(proxyBeanMethods = false)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class SchedulingConfiguration {

    @Bean(name = TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)
    @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
    public ScheduledAnnotationBeanPostProcessor scheduledAnnotationProcessor() {
        return new ScheduledAnnotationBeanPostProcessor();
    }

}
后置处理器做了什么呢?很简单。
bean初始化的时候进行了扫描.
针对方法的Schedule注解进行扫描。
public class ScheduledAnnotationBeanPostProcessor
        implements ScheduledTaskHolder, MergedBeanDefinitionPostProcessor, DestructionAwareBeanPostProcessor,
        Ordered, EmbeddedValueResolverAware, BeanNameAware, BeanFactoryAware, ApplicationContextAware,
        SmartInitializingSingleton, ApplicationListener<ContextRefreshedEvent>, DisposableBean {

    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) {
        if (bean instanceof AopInfrastructureBean || bean instanceof TaskScheduler ||
                bean instanceof ScheduledExecutorService) {
            // Ignore AOP infrastructure such as scoped proxies.
            return bean;
        }

        Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);
        if (!this.nonAnnotatedClasses.contains(targetClass) &&
                AnnotationUtils.isCandidateClass(targetClass, Arrays.asList(Scheduled.class, Schedules.class))) {
            Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
                    (MethodIntrospector.MetadataLookup<Set<Scheduled>>) method -> {
                        Set<Scheduled> scheduledAnnotations = AnnotatedElementUtils.getMergedRepeatableAnnotations(
                                method, Scheduled.class, Schedules.class);
                        return (!scheduledAnnotations.isEmpty() ? scheduledAnnotations : null);
                    });
            if (annotatedMethods.isEmpty()) {
                this.nonAnnotatedClasses.add(targetClass);
                if (logger.isTraceEnabled()) {
                    logger.trace("No @Scheduled annotations found on bean class: " + targetClass);
                }
            }
            else {
                // Non-empty set of methods
                annotatedMethods.forEach((method, scheduledAnnotations) ->
                        scheduledAnnotations.forEach(scheduled -> processScheduled(scheduled, method, bean)));
                if (logger.isTraceEnabled()) {
                    logger.trace(annotatedMethods.size() + " @Scheduled methods processed on bean '" + beanName +
                            "': " + annotatedMethods);
                }
            }
        }
        return bean;
    }
}
扫描后做什么呢?
那就是进行注册,源码如下.其实就是利用了registrar进行注册。
    protected void processScheduled(Scheduled scheduled, Method method, Object bean) {
        try {
            Runnable runnable = createRunnable(bean, method);
            boolean processedSchedule = false;
            String errorMessage =
                    "Exactly one of the 'cron', 'fixedDelay(String)', or 'fixedRate(String)' attributes is required";

            Set<ScheduledTask> tasks = new LinkedHashSet<>(4);

            // Determine initial delay
            long initialDelay = convertToMillis(scheduled.initialDelay(), scheduled.timeUnit());
            String initialDelayString = scheduled.initialDelayString();
            if (StringUtils.hasText(initialDelayString)) {
                Assert.isTrue(initialDelay < 0, "Specify 'initialDelay' or 'initialDelayString', not both");
                if (this.embeddedValueResolver != null) {
                    initialDelayString = this.embeddedValueResolver.resolveStringValue(initialDelayString);
                }
                if (StringUtils.hasLength(initialDelayString)) {
                    try {
                        initialDelay = convertToMillis(initialDelayString, scheduled.timeUnit());
                    }
                    catch (RuntimeException ex) {
                        throw new IllegalArgumentException(
                                "Invalid initialDelayString value \"" + initialDelayString + "\" - cannot parse into long");
                    }
                }
            }

            // Check cron expression
            String cron = scheduled.cron();
            if (StringUtils.hasText(cron)) {
                String zone = scheduled.zone();
                if (this.embeddedValueResolver != null) {
                    cron = this.embeddedValueResolver.resolveStringValue(cron);
                    zone = this.embeddedValueResolver.resolveStringValue(zone);
                }
                if (StringUtils.hasLength(cron)) {
                    Assert.isTrue(initialDelay == -1, "'initialDelay' not supported for cron triggers");
                    processedSchedule = true;
                    if (!Scheduled.CRON_DISABLED.equals(cron)) {
                        TimeZone timeZone;
                        if (StringUtils.hasText(zone)) {
                            timeZone = StringUtils.parseTimeZoneString(zone);
                        }
                        else {
                            timeZone = TimeZone.getDefault();
                        }
                        tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone))));
                    }
                }
            }

            // At this point we don't need to differentiate between initial delay set or not anymore
            if (initialDelay < 0) {
                initialDelay = 0;
            }

            // Check fixed delay
            long fixedDelay = convertToMillis(scheduled.fixedDelay(), scheduled.timeUnit());
            if (fixedDelay >= 0) {
                Assert.isTrue(!processedSchedule, errorMessage);
                processedSchedule = true;
                tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));
            }

            String fixedDelayString = scheduled.fixedDelayString();
            if (StringUtils.hasText(fixedDelayString)) {
                if (this.embeddedValueResolver != null) {
                    fixedDelayString = this.embeddedValueResolver.resolveStringValue(fixedDelayString);
                }
                if (StringUtils.hasLength(fixedDelayString)) {
                    Assert.isTrue(!processedSchedule, errorMessage);
                    processedSchedule = true;
                    try {
                        fixedDelay = convertToMillis(fixedDelayString, scheduled.timeUnit());
                    }
                    catch (RuntimeException ex) {
                        throw new IllegalArgumentException(
                                "Invalid fixedDelayString value \"" + fixedDelayString + "\" - cannot parse into long");
                    }
                    tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));
                }
            }

            // Check fixed rate
            long fixedRate = convertToMillis(scheduled.fixedRate(), scheduled.timeUnit());
            if (fixedRate >= 0) {
                Assert.isTrue(!processedSchedule, errorMessage);
                processedSchedule = true;
                tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));
            }
            String fixedRateString = scheduled.fixedRateString();
            if (StringUtils.hasText(fixedRateString)) {
                if (this.embeddedValueResolver != null) {
                    fixedRateString = this.embeddedValueResolver.resolveStringValue(fixedRateString);
                }
                if (StringUtils.hasLength(fixedRateString)) {
                    Assert.isTrue(!processedSchedule, errorMessage);
                    processedSchedule = true;
                    try {
                        fixedRate = convertToMillis(fixedRateString, scheduled.timeUnit());
                    }
                    catch (RuntimeException ex) {
                        throw new IllegalArgumentException(
                                "Invalid fixedRateString value \"" + fixedRateString + "\" - cannot parse into long");
                    }
                    tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));
                }
            }

            // Check whether we had any attribute set
            Assert.isTrue(processedSchedule, errorMessage);

            // Finally register the scheduled tasks
            synchronized (this.scheduledTasks) {
                Set<ScheduledTask> regTasks = this.scheduledTasks.computeIfAbsent(bean, key -> new LinkedHashSet<>(4));
                regTasks.addAll(tasks);
            }
        }
        catch (IllegalArgumentException ex) {
            throw new IllegalStateException(
                    "Encountered invalid @Scheduled method '" + method.getName() + "': " + ex.getMessage());
        }
    }

那为什么注册之后,任务就可以运行了

其实在注册之后,便会将任务,放入线程池。等待被执行。
class ReschedulingRunnable extends DelegatingErrorHandlingRunnable implements ScheduledFuture<Object> {
    @Nullable
    public ScheduledFuture<?> schedule() {
        synchronized (this.triggerContextMonitor) {
                        //通过触发器拿到下次执行任务的时间。
            this.scheduledExecutionTime = this.trigger.nextExecutionTime(this.triggerContext);
            if (this.scheduledExecutionTime == null) {
                return null;
            }
                        //计算出延迟的时间,放入队列。
            long initialDelay = this.scheduledExecutionTime.getTime() - this.triggerContext.getClock().millis();
            this.currentFuture = this.executor.schedule(this, initialDelay, TimeUnit.MILLISECONDS);
            return this;
        }
    }
}

那么如何定时被执行呢?

class ReschedulingRunnable extends DelegatingErrorHandlingRunnable implements ScheduledFuture<Object> {
    @Override
    public void run() {
        Date actualExecutionTime = new Date(this.triggerContext.getClock().millis());
                //执行任务代码
        super.run();
        Date completionTime = new Date(this.triggerContext.getClock().millis());
        synchronized (this.triggerContextMonitor) {
            Assert.state(this.scheduledExecutionTime != null, "No scheduled execution");
            this.triggerContext.update(this.scheduledExecutionTime, actualExecutionTime, completionTime);
            if (!obtainCurrentFuture().isCancelled()) {
                                //执行完如果任务没有被取消。开始套娃。
                schedule();
            }
        }
    }
}

fixDelay&fixRate

其实二者的实现也是一样的,套娃调度。
public class ScheduledThreadPoolExecutor
        extends ThreadPoolExecutor
        implements ScheduledExecutorService {

    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (period <= 0)
            throw new IllegalArgumentException();
        //构建成需要的任务结构
        ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          unit.toNanos(period));
        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
        sft.outerTask = t;
        //延时执行
        delayedExecute(t);
        return t;
    }

    private class ScheduledFutureTask<V>
            extends FutureTask<V> implements RunnableScheduledFuture<V> {

        public void run() {
            boolean periodic = isPeriodic();
            if (!canRunInCurrentRunState(periodic))
                cancel(false);
            else if (!periodic)
                ScheduledFutureTask.super.run();
            else if (ScheduledFutureTask.super.runAndReset()) {
                //执行完设置下次的执行时间
                setNextRunTime();
                //将任务放入队列
                reExecutePeriodic(outerTask);
            }
        }
}
因此目前得知。
其实spring也没有将定时任务转成特定的bean.
因此我们只要能够机械能任务的注册即可。
另外则是,定时任务每次的执行,都是需要上一次完成才可以。所以不存在所谓的并发问题。
另外则是下一次的执行时间

脚手架中如何实现定时任务

@ConditionalOnBean(value = ScheduleConfiguration.class)
@Configuration
public class TaskConfiguration implements SchedulingConfigurer{
    
    //用于保存定时任务注册器
    private static ScheduledTaskRegistrar scheduledTaskRegistrar;

    private static Map<String, ScheduledTask> TASK_MAP = new HashMap<String, ScheduledTask>();

    public static Map<String, ScheduledTask> getTASK_MAP() {
        return TASK_MAP;
    }
    
    public static ScheduledTaskRegistrar getScheduledTaskRegistrar() {
        return scheduledTaskRegistrar;
    }

    @Override
    public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
        TaskConfiguration.scheduledTaskRegistrar = taskRegistrar;
    }
}



比较简单。实现接口,并且注入容器即可
首先这里有2个接口
第一个接口则是用于cron表达式类型的定时任务。
public abstract class CronTask implements Task{.
        //获取cron表达式,默认从容器中获取
    public String getCron() {
        return ApplicationContextUtils.getPropertyStrValue(getCronKey());
    }
        //设置任务名称
    public abstract String getName();
    //设置cron表达式的key
    public abstract String getCronKey();
}


第二个接口则是用于延迟类型的定时任务。
public abstract class FixDelayTask implements Task{
    public abstract String getFixDelayKey();
    public abstract String getInitialDelayKey();
    //获取任务的初始延时间隔
    public long getInitialDelay() {
        return ApplicationContextUtils.getPropertyValue(getInitialDelayKey(), Long.class);
    };
    //获取任务的延时间隔
    public long getFixedDelay() {
        return ApplicationContextUtils.getPropertyValue(getFixDelayKey(), Long.class);
    }
    //是否固定频率。如果为true则为fixRate, false则为fixDelay
    public abstract boolean isFixedRate();
}


下面是几个demo.

@Component
public class CustomCronTask extends CronTask{
    @Override
    public void run() {
        System.out.println(this.getName() + "nothing to do");
    }

    @Override
    public String getName() {
        return "cron";
    }

    @Override
    public String getCronKey() {
        return "custom.task.alarm.cron";
    }
}

@Component
public class CustomFixDelayTask extends FixDelayTask{

    @Override
    public String getName() {
        return "delay";
    }

    @Override
    public void run() {
        System.out.println(this.getName() + "nothing to do");
    }

    @Override
    public String getFixDelayKey() {
        return "custom.task.alarm.fixdelay";
    }

    @Override
    public String getInitialDelayKey() {
        return "custom.task.alarm.fix-initdelay";
    }

    @Override
    public boolean isFixedRate() {
        return false;
    }

}

@Component
public class CustomFixRateTask extends FixDelayTask{

    @Override
    public String getName() {
        return "rate";
    }

    @Override
    public void run() {
        System.out.println(this.getName() + "nothing to do");
    }

    @Override
    public String getFixDelayKey() {
        return "custom.task.alarm.fixrate";
    }

    @Override
    public String getInitialDelayKey() {
        return "custom.task.alarm.fixrate-initdelay";
    }

    @Override
    public boolean isFixedRate() {
        return true;
    }
}

脚手架如何实现对定时任务的管理

其实定时任务的核心点在于,获取下次任务的执行时间。
spring的定时任务本身,提供了触发器可以去解决这个问题。
因此我们的核心点在于,去重新实现对应的触发器。
目前支持的触发器类型有俩种。
一种是定时触发器CronTrigger,另外一种则是周期触发器PeriodicTrigger.


覆写的源码如下。
public class CustomCronTrigger extends CronTrigger{
    
    public CustomCronTrigger(CronTask cronTask) {
        //其实只是初始化时有点用,后面没用。统一从容器获取
        super(cronTask.getCron());
        this.cronTask = cronTask;
    }

    private CronTask cronTask;

    @Override
    public Date nextExecutionTime(TriggerContext triggerContext) {
        Date date = triggerContext.lastCompletionTime();
        if (date != null) {
            Date scheduled = triggerContext.lastScheduledExecutionTime();
            if (scheduled != null && date.before(scheduled)) {
                date = scheduled;
            }
        }
        else {
            date = new Date(triggerContext.getClock().millis());
        }
        ZonedDateTime dateTime = ZonedDateTime.ofInstant(date.toInstant(), ZoneId.systemDefault());
        //由于cron只会初始化一次。因此,改为从容器中读取。主要是支持配置的动态刷新
        ZonedDateTime next = CronExpression.parse(this.getExpression()).next(dateTime);
        return (next != null ? Date.from(next.toInstant()) : null); 
    }

    @Override
    public String getExpression() {
        return cronTask.getCron();
    }
}

public class CustomFixDelayTrigger extends PeriodicTrigger{
    private FixDelayTask ft;
    public CustomFixDelayTrigger(FixDelayTask ft) {
        super(ft.getFixedDelay());
        this.ft = ft;
    }

    @Override
    public long getPeriod() {
        return ft.getFixedDelay();
    }

    @Override
    public long getInitialDelay() {
        return ft.getInitialDelay();
    }

    

    @Override
    public boolean isFixedRate() {
        return ft.isFixedRate();
    }

    @Override
    public Date nextExecutionTime(TriggerContext triggerContext) {
        Date lastExecution = triggerContext.lastScheduledExecutionTime();
        Date lastCompletion = triggerContext.lastCompletionTime();
        if (lastExecution == null || lastCompletion == null) {
            return new Date(triggerContext.getClock().millis() + this.getInitialDelay());
        }
        if (this.isFixedRate()) {
            return new Date(lastExecution.getTime() + this.getPeriod());
        }
        return new Date(lastCompletion.getTime() + this.getPeriod());
    }
}

脚手架-定时任务的注册

时机比较简单。针对服务启动时即可。
因此这里使用哨兵对服务进行初始化
@Component
public class ContextStandGuard  implements ApplicationRunner, DisposableBean {

    @Autowired
    private TaskService taskService;
    
    @Override
    public void destroy() {
        TaskConfiguration.getTASK_MAP().values().forEach(task -> {
            task.cancel(true);
        });
        System.out.println("容器摧毁时,顺便摧毁定时任务");
    }

    @Override
    public void run(ApplicationArguments args) throws Exception {
        System.out.println("容器启动");
        taskService.init();
    }
}

脚手架-定时任务的注册实现

原理比较简单。
从容器中获取bean。
然后构建对应的触发器即可。

@Component
public class TaskServiceImpl implements TaskService{
    
    @Override
    public void init() {
        List<Task> tasks = ApplicationContextUtils.getBeanList(Task.class);
        if (CollectionUtils.isEmpty(tasks)) {
            return;
        }
        for (Task task : tasks) {
            TriggerTask triggerTask = getTriggerTask(task);
            if(triggerTask == null) {
                continue;
            }
            ScheduledTask st = TaskConfiguration.getScheduledTaskRegistrar().scheduleTriggerTask(triggerTask);
            TaskConfiguration.getTASK_MAP().put(task.getName(), st);
        }
    }

    private TriggerTask getTriggerTask(Task task) {
        Trigger tg = genTrigger(task);
        if(tg == null) {
            return null;
        }
        return new TriggerTask(task, tg);
    }
    
    private Trigger genTrigger(Task task) {
        if(CronTask.class.isAssignableFrom(task.getClass())) {
            CronTask ct = (CronTask)task;
            return new CustomCronTrigger(ct);
        }else if(FixDelayTask.class.isAssignableFrom(task.getClass())) {
            FixDelayTask ft = (FixDelayTask)task;
            return new CustomFixDelayTrigger(ft);
        }
        return null;
    }

    @Override
    public boolean destory(String taskname) {
        TaskConfiguration.getTASK_MAP().get(taskname).cancel();
        return true;
    }

    @Override
    public boolean register(String taskname) {
        return true;
    }
}

脚手架-定时任务的取消

这里首先回到源码。
如果一个任务正在执行,这会被取消了,会有什么影响吗?
没有任何影响,但是下次任务就没办法执行了。

class ReschedulingRunnable extends DelegatingErrorHandlingRunnable implements ScheduledFuture<Object> {

    @Override
    public void run() {
        Date actualExecutionTime = new Date(this.triggerContext.getClock().millis());
        super.run();
        Date completionTime = new Date(this.triggerContext.getClock().millis());
        synchronized (this.triggerContextMonitor) {
            Assert.state(this.scheduledExecutionTime != null, "No scheduled execution");
            this.triggerContext.update(this.scheduledExecutionTime, actualExecutionTime, completionTime);
                        从源码来看-只要isCancelled为true就不会执行了,那么定时任务就会套娃失败。任务就被移除了。
            if (!obtainCurrentFuture().isCancelled()) {
                schedule();
            }
        }
    }

    @Override
    public boolean isCancelled() {
        synchronized (this.triggerContextMonitor) {
            return obtainCurrentFuture().isCancelled();
        }
    }

}
所以脚手架对于定时任务的取消。也比较简单了。就是直接简单的移除即可
    @Override
    public boolean destory(String taskname) {
        TaskConfiguration.getTASK_MAP().get(taskname).cancel();
        return true;
    }

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容