起因
一个基于Spring Cloud框架的应用,如果注册到了Eureka server,那么它就会定时更新服务列表,这个定时任务启动的代码在com.netflix.discovery.DiscoveryClient类的initScheduledTasks方法中,如下(来自工程eureka-client,版本1.10.7):
/**
* Initializes all scheduled tasks.
*/
private void initScheduledTasks() {
//更新服务列表
if (clientConfig.shouldFetchRegistry()) {
// registry cache refresh timer
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
// 初始化定时拉取服务注册信息
cacheRefreshTask = new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
new CacheRefreshThread()
);
scheduler.schedule(
cacheRefreshTask,
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
...
//略去其他代码
由此可见,TimedSupervisorTask类被使用在了定时任务的初始化中,我们具体来看看这个类的结构:
package com.netflix.discovery;
import java.util.TimerTask;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import com.netflix.servo.monitor.Counter;
import com.netflix.servo.monitor.LongGauge;
import com.netflix.servo.monitor.MonitorConfig;
import com.netflix.servo.monitor.Monitors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 在执行超时时调度子任务的管理器任务
* A supervisor task that schedules subtasks while enforce a timeout.
* Wrapped subtasks must be thread safe.
*
* @author David Qiang Liu
*/
public class TimedSupervisorTask extends TimerTask {
private static final Logger logger = LoggerFactory.getLogger(TimedSupervisorTask.class);
private final Counter successCounter;
private final Counter timeoutCounter;
private final Counter rejectedCounter;
private final Counter throwableCounter;
private final LongGauge threadPoolLevelGauge;
private final String name;
private final ScheduledExecutorService scheduler;
private final ThreadPoolExecutor executor;
private final long timeoutMillis;
private final Runnable task;
private final AtomicLong delay;
private final long maxDelay;
public TimedSupervisorTask(String name, ScheduledExecutorService scheduler, ThreadPoolExecutor executor,
int timeout, TimeUnit timeUnit, int expBackOffBound, Runnable task) {
this.name = name;
this.scheduler = scheduler;
this.executor = executor;
//默认30秒
this.timeoutMillis = timeUnit.toMillis(timeout);
this.task = task;
this.delay = new AtomicLong(timeoutMillis);
//默认300秒
this.maxDelay = timeoutMillis * expBackOffBound;
// Initialize the counters and register.
successCounter = Monitors.newCounter("success");
timeoutCounter = Monitors.newCounter("timeouts");
rejectedCounter = Monitors.newCounter("rejectedExecutions");
throwableCounter = Monitors.newCounter("throwables");
threadPoolLevelGauge = new LongGauge(MonitorConfig.builder("threadPoolUsed").build());
Monitors.registerObject(name, this);
}
@Override
public void run() {
Future<?> future = null;
try {
//使用Future,可以设定子纯种的超时时间,这样当前线程就不用无限等待了
future = executor.submit(task);
threadPoolLevelGauge.set((long) executor.getActiveCount());
//指定等待子线程的最长时间(初始为30秒)
future.get(timeoutMillis, TimeUnit.MILLISECONDS); // block until done or timeout
//delay是个很有用的变量,后面会用到,这里记得每次执行任务成功都会将delay重置
delay.set(timeoutMillis);
threadPoolLevelGauge.set((long) executor.getActiveCount());
successCounter.increment();
} catch (TimeoutException e) {
logger.warn("task supervisor timed out", e);
timeoutCounter.increment();
long currentDelay = delay.get();
//任务线程超时的时候,就把delay变量翻倍,但不会超过外部调用时设定的最大延时时间(300秒)
long newDelay = Math.min(maxDelay, currentDelay * 2);
//设置为最新的值,考虑到多线程,所以用了CAS
delay.compareAndSet(currentDelay, newDelay);
} catch (RejectedExecutionException e) {
//触发了拒绝策略,就会将调度器停掉
if (executor.isShutdown() || scheduler.isShutdown()) {
logger.warn("task supervisor shutting down, reject the task", e);
} else {
logger.warn("task supervisor rejected the task", e);
}
rejectedCounter.increment();
} catch (Throwable e) {
//一旦出现未知的异常,就停掉调度器
if (executor.isShutdown() || scheduler.isShutdown()) {
logger.warn("task supervisor shutting down, can't accept the task");
} else {
logger.warn("task supervisor threw an exception", e);
}
throwableCounter.increment();
} finally {
//这里任务要么执行完毕,要么发生异常,都用cancel方法来清理任务;
if (future != null) {
future.cancel(true);
}
//只要调度器没有停止,就再指定等待时间之后在执行一次同样的任务
if (!scheduler.isShutdown()) {
//这里就是周期性任务的原因:只要没有停止调度器,就再创建一次性任务,执行时间时dealy的值,
//假设外部调用时传入的超时时间为30秒(构造方法的入参timeout),最大间隔时间为300秒(构造方法的入参expBackOffBound)
//如果最近一次任务没有超时,那么就在30秒后开始新任务,
//如果最近一次任务超时了,那么就在50秒后开始新任务(异常处理中有个乘以二的操作,乘以二后的300秒)
scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS);
}
}
}
...
//略去其他代码
}
我们可以仔细看看run方法的具体实现,因为这里有一个值得借鉴的设计思路!!!
我们简单来看看这个方法具体执行流程:
1.执行submit()方法提交任务
2.执行future.get()方法,如果没有在规定的时间得到返回值或者任务出现异常,则进入异常处理catch代码块。
3.如果发生异常
a. 发生TimeoutException异常,则执行Math.min(maxDelay, currentDelay * 2);得到任务延时时间 * 2 和 最大延时时间的最小值,然后改变任务的延时时间timeoutMillis(延时任务时间默认值是30s)
b.发生RejectedExecutionException异常,则将rejectedCounter值+1
c.发生Throwable异常,则将throwableCounter值+1
4.如果没有发生异常,则再设置一次延时任务时间timeoutMillis
5.进入finally代码块
a.如果future不为null,则执行future.cancel(true),中断线程停止任务
b.如果线程池没有shutdown,则创建一个新的定时任务
注意:不知道有没有小伙伴发现,不管我们的定时任务执行是成功还是结束(如果还没有执行结束,也会被中断),然后会再重新初始化一个新的任务。并且这个任务的延时时间还会因为不同的情况受到改变,在try代码块中如果不发现异常,则会重新初始化延时时间,如果发生TimeoutException异常,则会更改延时时间,更改为 任务延时时间 * 2 和 最大延时时间的最小值。所以我们会发现这样的设计会让整个延时任务很灵活。如果不发生异常,则延时时间不会变;如果发现异常,则增长延时时间;如果程序又恢复正常了,则延时时间又恢复成了默认值。
总结:我们在设计延时/周期性任务时就可以参考TimedSupervisorTask的实现,程序一旦遇到发生超时异常,就将间隔时间调大,如果连续超时,那么每次间隔时间都会增大一倍,一直到达外部参数设定的上限为止,一旦新任务不再发生超时异常,间隔时间又会自动恢复为初始值。