我们知道JDK原生的Timer、ScheduledThreadPoolExecutor都可以处理定时任务。但它们都是基于小顶堆/大顶堆,因此添加任务和删除任务的时间复杂度都是 O(Log2n) 。在任务数量很大的情况下,性能表现比较差。
在论文《Hashed and Hierarchical Timing Wheels》中有一种时间轮的数据结构,在处理延迟任务时,时间复杂度可降低到O(1)。
这个盗个图:
顾名思义,时间轮和钟表一样。每个时间格子称之为槽位,代表时间单位,如1秒。拥有的槽位个数就是最大任务个数,每个槽位存放延迟任务队列。当一个延迟任务插入时,先计算延迟时间与单位时间的余值,看该任务要插入哪个槽内的延迟队列。时间轮每过一个单位时间,指针指向下一个槽位,该槽位内的任务队列全部触发。
当然了,这种基本的时间轮有诸多限制,如只能支持槽位个数的任务数、时间跨度太小。解决方案有:
- 不同轮次的延迟任务共存相同的延迟队列
一个60个槽位的时间轮,每个槽位代表1s,那么这个时间轮只能表示1分钟的任务,如21:00,那21:01呢,当然可以复用这个时间轮,那就要引入轮次的概念。如果从21:00开始算,那21:01是第二轮,依次类推。延迟时间对时间轮长度求余即要插入的槽位。这样插入的时间复杂度仍然是O(0),获取是否有任务时间复杂度变为O(n)。当然可以用小顶堆来操作,但这样时间插入复杂度又会上升到O(log2n) - 多层次时间轮
如一天的时间可以用三级时间轮:小时时间轮(24),分钟时间轮(60),秒时间轮(60),秒时间轮走一圈,分钟时间轮走一格;分钟时间轮走一圈,小时时间轮走一格。
netty时间轮HashedWheelTimer实现
HashedWheelTimer是采用轮次方案
构造方法:
public HashedWheelTimer(
ThreadFactory threadFactory,
long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
long maxPendingTimeouts) {
//省略
// Normalize ticksPerWheel to power of two and initialize the wheel.
wheel = createWheel(ticksPerWheel);
//省略
workerThread = threadFactory.newThread(worker);
//省略
}
//创建循环数组
private static HashedWheelBucket[] createWheel(int ticksPerWheel) {
//略
//设置大小为最近的2的n次方
ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);
HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
for (int i = 0; i < wheel.length; i ++) {
//每个槽位初始化为一个双向链表
wheel[i] = new HashedWheelBucket();
}
return wheel;
}
新增延时任务newTimeout
@Override
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
ObjectUtil.checkNotNull(task, "task");
ObjectUtil.checkNotNull(unit, "unit");
//任务计数
long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
//判断任务数是否超过最大限制,默认maxPendingTimeouts =-1,无限制
if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
pendingTimeouts.decrementAndGet();
throw new RejectedExecutionException("Number of pending timeouts ("
+ pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
+ "timeouts (" + maxPendingTimeouts + ")");
}
//开启任务工作线程worker
start();
// Add the timeout to the timeout queue which will be processed on the next tick.
// During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket.
long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
// Guard against overflow.
if (delay > 0 && deadline < 0) {
deadline = Long.MAX_VALUE;
}
//封装任务
HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
//放入Mpsc队列timesout
timeouts.add(timeout);
return timeout;
}
public void start() {
//只有一个线程能执行workerThread.start()
switch (WORKER_STATE_UPDATER.get(this)) {
case WORKER_STATE_INIT:
if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
workerThread.start();
}
break;
case WORKER_STATE_STARTED:
break;
case WORKER_STATE_SHUTDOWN:
throw new IllegalStateException("cannot be started once stopped");
default:
throw new Error("Invalid WorkerState");
}
// Wait until the startTime is initialized by the worker.
while (startTime == 0) {
try {
startTimeInitialized.await();
} catch (InterruptedException ignore) {
// Ignore - it will be ready very soon.
}
}
}
新增任务时,不是直接放到时间轮的延迟队列中,而是放到Mpsc队列。然后由单线程worker提取任务。继续看下worker线程的run方法:
public void run() {
// Initialize the startTime.
startTime = System.nanoTime();
if (startTime == 0) {
// We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized. 1表示初始化
startTime = 1;
}
// Notify the other threads waiting for the initialization at start().
startTimeInitialized.countDown();
do {
//等待指针到下一个tick,指针移动一次为一个tick
final long deadline = waitForNextTick();
//deadline大于0说明有任务
if (deadline > 0) {
//确定在哪个槽位
int idx = (int) (tick & mask);
//删除被取消的队列,当我们删除任务时,任务就会进入取消队列
processCancelledTasks();
HashedWheelBucket bucket =
wheel[idx];
//循环将队列中的任务放入槽内的延迟队列,每次最多转10000个,防止占CPU过多时间
transferTimeoutsToBuckets();
//更新延迟队列中的任务时间
bucket.expireTimeouts(deadline);
//更新tick
tick++;
}
} while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
//stop停止后清理任务
// Fill the unprocessedTimeouts so we can return them from stop() method.
for (HashedWheelBucket bucket: wheel) {
bucket.clearTimeouts(unprocessedTimeouts);
}
for (;;) {
HashedWheelTimeout timeout = timeouts.poll();
if (timeout == null) {
break;
}
if (!timeout.isCancelled()) {
unprocessedTimeouts.add(timeout);
}
}
processCancelledTasks();
}
这里我们再看下expireTimeouts方法
public void expireTimeouts(long deadline) {
HashedWheelTimeout timeout = head;
// process all timeouts
//遍历延迟队列,处理所有可执行任务,时间没到的轮次-1,否则执行
while (timeout != null) {
HashedWheelTimeout next = timeout.next;
if (timeout.remainingRounds <= 0) {//轮次<=0,删除并执行
next = remove(timeout);
if (timeout.deadline <= deadline) {
timeout.expire(); //会调用任务的run方法
} else {
// The timeout was placed into a wrong slot. This should never happen.
throw new IllegalStateException(String.format(
"timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
}
} else if (timeout.isCancelled()) {
next = remove(timeout);
} else {
timeout.remainingRounds --;
}
timeout = next;
}
}
总结
netty这里使用轮次时间轮,而不是更复杂的多层时间轮。每次新增任务时是先放到MSPC队列中,然后一个worker线程消费MSPC队列(不用考虑并发问题),时间每到一个槽位,遍历槽位的延迟队列,轮次小于等于0就代表本次要执行了。
netty时间轮设计还是比较简单的,因为新增任务使用了MSPC队列,时间复杂度算是O(1),单执行任务时只有一个work线程,任务多时执行较慢,而且会受之前的任务执行时间影响。