jdk中提供了多种定时器的实现。不禁让人好奇,jdk究竟是如何做到让任务到了指定时间执行的?试问,如果是我,我会怎么实现?
jdk中能够实现定时器功能的大致有三种方式:
- java.util.Timer
- java.util.concurrent.DelayQueue
- java.util.concurrent.ScheduledThreadPoolExecutor
静下心来,咱们一一探究。
一. java.util.Timer
示例代码:
/**
* 安排指定的任务task在指定的时间firstTime开始进行重复的固定速率period执行
* 每天中午12点都执行一次
*
* @author Fooisart
* Created on 21:46 14-01-2019
*/
public class TimerDemo {
public static void main(String[] args) {
Timer timer = new Timer();
Calendar calendar = Calendar.getInstance();
calendar.set(Calendar.HOUR_OF_DAY, 12);//控制小时
calendar.set(Calendar.MINUTE, 0);//控制分钟
calendar.set(Calendar.SECOND, 0);//控制秒
Date time = calendar.getTime();//执行任务时间为12:00:00
//每天定时12:00执行操作,每隔2秒执行一次
timer.schedule(new TimerTask() {
@Override
public void run() {
System.out.println(new Date() + "执行任务。。。");
}
}, time, 1000 * 2);
}
}
Demo中使用了Timer
实现了一个定时任务,该任务在每天12点开始执行,并且每隔2秒执行一次。
顺手牵羊:查看源码时,无意发现Timer
中有schedule
与scheduleAtFixedRate
,它俩都可以到约定时间按照指定时间间隔执行。然而它俩的区别是什么呢?官方解释:一个是Fixed-delay
,一个是Fixed-rate
。那么这两个词到底是什么意思呢?把demo中的代码运行一遍,然后把schedule
换成scheduleAtFixedRate
,就全部了然了。
示例代码中较为简洁,能看出控制执行时间的方法应该是 timer.schedule()
,跟进去看源码:
public void schedule(TimerTask task, Date firstTime, long period) {
if (period <= 0)
throw new IllegalArgumentException("Non-positive period.");
sched(task, firstTime.getTime(), -period);
}
- task 表示要执行的任务逻辑
- firstTime 表示第一次执行的时间
- period 表示每次间隔时间
继续跟进:
private void sched(TimerTask task, long time, long period) {
//省略非重点代码
synchronized(queue) {
if (!thread.newTasksMayBeScheduled)
throw new IllegalStateException("Timer already cancelled.");
synchronized(task.lock) {
if (task.state != TimerTask.VIRGIN)
throw new IllegalStateException(
"Task already scheduled or cancelled");
task.nextExecutionTime = time;
task.period = period;
task.state = TimerTask.SCHEDULED;
}
queue.add(task);
if (queue.getMin() == task)
queue.notify();
}
}
这里其实做了两个事情
- 给
task
设定了一些参数,类似于初始化task
。这里还给它加了把锁,可以思考一下为甚要在此初始化?为何要加锁?(不是本文范畴,各位伙伴自行思考) - 把初始化后的
task
加入到queue
中。
读到这里,我们还是没有看到到底是如何实现定时的?别着急,继续。进入queu.add(task)
,
/**
* Adds a new task to the priority queue.
*/
void add(TimerTask task) {
// Grow backing store if necessary
if (size + 1 == queue.length)
queue = Arrays.copyOf(queue, 2*queue.length);
queue[++size] = task;
fixUp(size);
}
这里注释提到,加入一个新任务到优先级队列中去。其实这里的TimerTask[]
是一个优先级队列,使用数组存储方式。并且它的数据结构是heap
。包括从fixUp()
我们也能看出来,它是在保持堆
属性,即堆化(heapify)
。
那么能分析的都分析完了,还是没能看到定时是如何实现的?再次静下来想一想,定时任务如果想执行,首先得启动定时器。所有咱们再次关注构造方法。
Timer一共有4个构造方法,看最底层的:
public Timer(String name) {
thread.setName(name);
thread.start();
}
可以看到,这里在启动一个thread
,那么既然是一个Thread,那肯定就得关注它的 run()
方法了。进入:
public void run() {
try {
mainLoop();
} finally {
// Someone killed this Thread, behave as if Timer cancelled
synchronized(queue) {
newTasksMayBeScheduled = false;
queue.clear(); // Eliminate obsolete references
}
}
}
继续进入mainLoop()
:
/**
* The main timer loop. (See class comment.)
*/
private void mainLoop() {
while (true) {
try {
TimerTask task;
boolean taskFired;
synchronized(queue) {
//省略
long currentTime, executionTime;
task = queue.getMin();
synchronized(task.lock) {
if (task.state == TimerTask.CANCELLED) {
queue.removeMin();
continue; // No action required, poll queue again
}
currentTime = System.currentTimeMillis();
executionTime = task.nextExecutionTime;
if (taskFired = (executionTime<=currentTime)) {
if (task.period == 0) { // Non-repeating, remove
queue.removeMin();
task.state = TimerTask.EXECUTED;
} else { // Repeating task, reschedule
queue.rescheduleMin(
task.period<0 ? currentTime - task.period
: executionTime + task.period);
}
}
}
if (!taskFired) // Task hasn't yet fired; wait
queue.wait(executionTime - currentTime);
}
if (taskFired) // Task fired; run it, holding no locks
task.run();
} catch(InterruptedException e) {
}
}
}
从上述源码中,可以看出有两个重要的if
,
-
if (taskFired = (executionTime<=currentTime))
,表示已经到了执行时间,那么下面执行任务就好了; -
if (!taskFired)
,表示未到执行时间,那么等待就好了。那么是如何等待的呢?再仔细一看,原来是调用了Object.wait(long timeout)
。
到这里我们知道了,原来jdk中的定时器是这样实现的啊,等待是使用最简单的Object.wait()
实现的啊!别着急,这里有个小提问:使用Therad.sleep()可以实现嘛?如果可以,为何不用呢?
java.util.concurrent.DelayQueue
比较细致地分析了java.util.Timer
,DelayQueue
也大同小异。整理一下心情,重新出发。
先上示例代码:
DelayQueue
它本质上是一个队列,而这个队列里也只有存放Delayed
的子类才有意义,所有定义了DelayTask:
public class DelayTask implements Delayed {
private Date startDate = new Date();
public DelayTask(Long delayMillions) {
this.startDate.setTime(new Date().getTime() + delayMillions);
}
@Override
public int compareTo(Delayed o) {
long result = this.getDelay(TimeUnit.NANOSECONDS)
- o.getDelay(TimeUnit.NANOSECONDS);
if (result < 0) {
return -1;
} else if (result > 0) {
return 1;
} else {
return 0;
}
}
@Override
public long getDelay(TimeUnit unit) {
Date now = new Date();
long diff = startDate.getTime() - now.getTime();
return unit.convert(diff, TimeUnit.MILLISECONDS);
}
}
public static void main(String[] args) throws Exception {
BlockingQueue<DelayTask> queue = new DelayQueue<>();
DelayTask delayTask = new DelayTask(1000 * 5L);
queue.put(delayTask);
while (queue.size()>0){
queue.take();
}
}
看main
方法,主要做了三件事:
- 构造DelayTask,其中的延迟时间是5秒
- 将任务放入队列
- 从队列中取任务
DelayQueue
跟刚才的Timer.TaskQueue
是比较相似的,都是优先级队列,放入元素时,都得堆化(DelayQueue.put()如果元素满了,会阻塞。自行研究)。重点看queue.take()
。
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null)
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return q.poll();
first = null; // don't retain ref while waiting
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}
源码中出现了三次await
字眼:
- 第一次是当队列为空时,等待;
- 第二次等待是因为,发现有任务,没有到执行时间,并且有准备执行的线程(leader)。咱们得讲理吧,既然已经有人在准备执行了,咱们就得等吧。
- 第三次是真正延时的地方了,
available.awaitNanos(delay)
,此时也没有别的线程要执行,也就是我将要执行,所有等待剩下的延迟时间即可。
这里咱们明白了,DelayQueue的等待是通过Condition.await()
来实现的。请注意,这里又有一个小问题了:Object.wait()与Conditon.await()有何异同?
java.util.concurrent.ScheduledThreadPoolExecutor
由于ScheduledThreadPoolExecutor
涉及到的线程池(ThreadPoolExecutor)内容较多,所有就不详细分析了,也考虑到读到这里,难免有些疲倦。直接简述一下结论:在创建ScheduledThreadPoolExecutor
时,线程池的工作队列使用的是DelayedWorkQueue
,它的take()
方法,与DelayQueue.take()
方法极其相似,也有三个等待。
至此,要结束了。总结一下,jdk中实现定时器一共有两种方式:
- 使用Object.wait()
- 使用Conditon.await()
还记得文中的两个小提问嘛:
- 使用Therad.sleep()可以实现嘛?如果可以,为何不用呢?
- Object.wait()与Conditon.await()有何异同?
As always and let me know what you think, leave the comments below. Bye :)