定时任务示例代码:
先提交的任务500ms再执行,后提交的任务100ms后执行,打印结果是后提交的任务先执行,本文主要分析定时线程池内部实现,这里面定时操作是怎么实现的,怎么保证顺序的
ScheduledThreadPoolExecutor executor = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(10);
final long now = System.currentTimeMillis();
executor.schedule(new Runnable() {
@Override
public void run() {
System.out.println("第一个任务开始 距离提交时间间隔");
System.out.println(System.currentTimeMillis() - now);
}
}, 500, TimeUnit.MILLISECONDS);
executor.schedule(new Runnable() {
@Override
public void run() {
System.out.println("第二个任务开始 距离提交时间间隔");
System.out.println(System.currentTimeMillis() - now);
}
}, 100, TimeUnit.MILLISECONDS);
分析ScheduledThreadPoolExecutor 构造函数
内部主要是构造的队列,线程池里面维护的队列是DelayedWorkQueue
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
分析提交任务发生了什么
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<?> t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null,
triggerTime(delay, unit)));
delayedExecute(t);
return t;
}
我们传进去参数是Runnable,而schedule内部要将Runnable转成ScheduledFutureTask
对于decorate默认就是返回参数task,也就是不处理
protected <V> RunnableScheduledFuture<V> decorateTask(
Runnable runnable, RunnableScheduledFuture<V> task) {
return task;
}
继续分析ScheduledFutureTask
ScheduledFutureTask(Runnable r, V result, long ns) {
super(r, result);
this.time = ns;
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();
}
内部构造函数传入时间戳,唯一id,周期
这个时间戳是 triggerTime(delay, unit)
long triggerTime(long delay) {
return now() +
((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}
也就是在提交任务时候把提交的时间加上延迟得出需要执行的时间戳,至于周期后面会说,是指周期性定时任务的周期
线程池执行schedule,构造好线程 ScheduledFutureTask,然后执行delayedExecute(t),分析这个方法如下
private void delayedExecute(RunnableScheduledFuture<?> task) {
if (isShutdown())
reject(task);
else {
super.getQueue().add(task);
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
ensurePrestart();
}
}
如果线程池关闭则执行拒绝策略。否则加入到阻塞队列,也就是前面说的构造函数里面DelayedWorkQueue,正常情况如果不关闭线程池则继续走ensurePrestart
ensurePrestart()内部实现
void ensurePrestart() {
int wc = workerCountOf(ctl.get());
if (wc < corePoolSize)
addWorker(null, true);
else if (wc == 0)
addWorker(null, false);
}
当前工作线程数量小于核心数量则开启工作线程,对于核心线程池为0也要开启工作线程处理
工作线程
其实就是线程池内部真正工作的线程,看addWorker内部实现
private boolean addWorker(Runnable firstTask, boolean core) {
... CAS操作线程总数量加1
...
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
workers.add(w);
workerAdded = true;
}
} finally{
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
工作线程就是new 传统意义上的线程new Thread,上锁处理添加到内部数组( workers是工作线程数组,实现是HashSet)中去,添加成功就开始执行start方法,触发Worker工作,Worker的工作就看它的run方法,
public void run() {
runWorker(this);
}
runWorker内部实现如下:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
} finally {
w.unlock();
......
这个工作线程是个死循环,task != null || (task = getTask()) != null也就是不停的getTask,然后task.run(),如此循环以达到线程池复用线程
跟下去getTask就是阻塞队列里面take
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
定时线程池添加任务是阻塞队列添加任务,执行的任务也是从阻塞队列获取
定时线程池内部的阻塞队列
DelayedWorkQueue
添加任务 add操作也就是offer
public boolean offer(Runnable x) {
RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = size;
if (i >= queue.length)
grow();
size = i + 1;
if (i == 0) {
queue[0] = e;
setIndex(e, 0);
} else {
siftUp(i, e);
}
if (queue[0] == e) {
leader = null;
available.signal();
}
} finally {
lock.unlock();
}
return true;
}
流程就是上锁然后把任务添加到队列,内部是一个数组,并且有siftUp操作,其实是优先队列,也就是生产者消费者模型来唤醒通知消费,这样队列取的操作就有数据了。队列使用锁所以是线程安全的。
DelayedWorkQueue取的流程
取有take和poll,区别是队列为空,take会等待但是poll直接返回null,而线程池取任务是take
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture<?> first = queue[0];
if (first == null)
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return finishPoll(first);
first = null; // don't retain ref while waiting
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}
这里就是阻塞队列的取操作,其实也就是queue[0],其实也是优先队列的获取最近一个需要执行的任务,优先队列比较的就是时间戳(提交任务时间+定时时间=需要执行任务的时间)优先队列比较参考代码
public int compareTo(Delayed other) {
if (other == this) // compare zero if same object
return 0;
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
优先队列内部比较的其实就是时间戳了,时间戳最小的到时候会被取出来
周期性的线程调度原理
executor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
System.out.println("第二个任务开始 距离提交时间间隔");
System.out.println(System.currentTimeMillis() - now);
}
}, 100,5000, TimeUnit.MILLISECONDS);
执行任务,针对periodic,
原来就是执行的时候,把下一次执行的时间改一下啦,然后重新执行任务
执行任务会根据isPeriodic()来判断是否是周期性任务
public void run() {
boolean periodic = isPeriodic();
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic)
ScheduledFutureTask.super.run();
else if (ScheduledFutureTask.super.runAndReset()) {
setNextRunTime();
reExecutePeriodic(outerTask);
}
}
对于周期性判断,其实就是传入的周期不为0
public boolean isPeriodic() {
return period != 0;
}
对于周期性任务,重新设置下一次执行任务时间戳,重新调度一次任务
else if (ScheduledFutureTask.super.runAndReset()) {
setNextRunTime();
reExecutePeriodic(outerTask);
}
private void setNextRunTime() {
long p = period;
if (p > 0)
time += p;
else
time = triggerTime(-p);
}
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
if (canRunInCurrentRunState(true)) {
super.getQueue().add(task);
if (!canRunInCurrentRunState(true) && remove(task))
task.cancel(false);
else
ensurePrestart();
}
}
定时任务怎么保证定时的时间
最后那不免有个疑问,这里优先队列保证取得是队列最近一个,那如果耗时是10s,那取出第一个,怎么保证过10s才延迟执行任务呢
RunnableScheduledFuture<?> first = queue[0];
if (first == null)
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return finishPoll(first);
first = null; // don't retain ref while waiting
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
当从阻塞队列取出任务,是要比较里面的时间的,也就是getDelay方法
public long getDelay(TimeUnit unit) {
return unit.convert(time - System.nanoTime(), NANOSECONDS);
}
取出来任务时间戳比当前时间要多,后续就会await多久然后再去获取任务来保证达到定时时间
这里就把定时任务线程池分析完毕了
总结定时线程池定时策略:内部阻塞队列以当前时间+定时时间算出该线程需要运行的时间点,对于线程池来说只是开启工作线程不停的
从队列中获取任务然后执行,而怎么做到取最近一个延迟任务都是交给队列实现,内部是通过优先队列实现,进而保证取出来的都是最近要执行的任务
知识点:优先队列,线程池调度策略,阻塞队列