【netty学习笔记十八】netty时间轮

我们知道JDK原生的Timer、ScheduledThreadPoolExecutor都可以处理定时任务。但它们都是基于小顶堆/大顶堆,因此添加任务和删除任务的时间复杂度都是 O(Log2n) 。在任务数量很大的情况下,性能表现比较差。
在论文《Hashed and Hierarchical Timing Wheels》中有一种时间轮的数据结构,在处理延迟任务时,时间复杂度可降低到O(1)。
这个盗个图:


image.png

顾名思义,时间轮和钟表一样。每个时间格子称之为槽位,代表时间单位,如1秒。拥有的槽位个数就是最大任务个数,每个槽位存放延迟任务队列。当一个延迟任务插入时,先计算延迟时间与单位时间的余值,看该任务要插入哪个槽内的延迟队列。时间轮每过一个单位时间,指针指向下一个槽位,该槽位内的任务队列全部触发。
当然了,这种基本的时间轮有诸多限制,如只能支持槽位个数的任务数、时间跨度太小。解决方案有:

  1. 不同轮次的延迟任务共存相同的延迟队列
    一个60个槽位的时间轮,每个槽位代表1s,那么这个时间轮只能表示1分钟的任务,如21:00,那21:01呢,当然可以复用这个时间轮,那就要引入轮次的概念。如果从21:00开始算,那21:01是第二轮,依次类推。延迟时间对时间轮长度求余即要插入的槽位。这样插入的时间复杂度仍然是O(0),获取是否有任务时间复杂度变为O(n)。当然可以用小顶堆来操作,但这样时间插入复杂度又会上升到O(log2n)
  2. 多层次时间轮
    如一天的时间可以用三级时间轮:小时时间轮(24),分钟时间轮(60),秒时间轮(60),秒时间轮走一圈,分钟时间轮走一格;分钟时间轮走一圈,小时时间轮走一格。

netty时间轮HashedWheelTimer实现

HashedWheelTimer是采用轮次方案
构造方法:

public HashedWheelTimer(
            ThreadFactory threadFactory,
            long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
            long maxPendingTimeouts) {

        //省略
        // Normalize ticksPerWheel to power of two and initialize the wheel.
        wheel = createWheel(ticksPerWheel);
        //省略
        workerThread = threadFactory.newThread(worker);

         //省略
    }
//创建循环数组
private static HashedWheelBucket[] createWheel(int ticksPerWheel) {
        //略
        //设置大小为最近的2的n次方
        ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);
        HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
        for (int i = 0; i < wheel.length; i ++) {
            //每个槽位初始化为一个双向链表
            wheel[i] = new HashedWheelBucket();
        }
        return wheel;
    }

新增延时任务newTimeout

@Override
    public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
        ObjectUtil.checkNotNull(task, "task");
        ObjectUtil.checkNotNull(unit, "unit");
        //任务计数
        long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
        //判断任务数是否超过最大限制,默认maxPendingTimeouts =-1,无限制
        if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
            pendingTimeouts.decrementAndGet();
            throw new RejectedExecutionException("Number of pending timeouts ("
                + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
                + "timeouts (" + maxPendingTimeouts + ")");
        }
        //开启任务工作线程worker
        start();

        // Add the timeout to the timeout queue which will be processed on the next tick.
        // During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket.
        long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;

        // Guard against overflow.
        if (delay > 0 && deadline < 0) {
            deadline = Long.MAX_VALUE;
        }
        //封装任务
        HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
        //放入Mpsc队列timesout
        timeouts.add(timeout);
        return timeout;
    }

public void start() {
        //只有一个线程能执行workerThread.start()
        switch (WORKER_STATE_UPDATER.get(this)) {
            case WORKER_STATE_INIT:
                if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
                    workerThread.start();
                }
                break;
            case WORKER_STATE_STARTED:
                break;
            case WORKER_STATE_SHUTDOWN:
                throw new IllegalStateException("cannot be started once stopped");
            default:
                throw new Error("Invalid WorkerState");
        }

        // Wait until the startTime is initialized by the worker.
        while (startTime == 0) {
            try {
                startTimeInitialized.await();
            } catch (InterruptedException ignore) {
                // Ignore - it will be ready very soon.
            }
        }
    }

新增任务时,不是直接放到时间轮的延迟队列中,而是放到Mpsc队列。然后由单线程worker提取任务。继续看下worker线程的run方法:

public void run() {
            // Initialize the startTime.
            startTime = System.nanoTime();
            if (startTime == 0) {
                // We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized.  1表示初始化
                startTime = 1;
            }

            // Notify the other threads waiting for the initialization at start().
            startTimeInitialized.countDown();

            do {
                //等待指针到下一个tick,指针移动一次为一个tick
                final long deadline = waitForNextTick();
                 //deadline大于0说明有任务
                if (deadline > 0) {
                    //确定在哪个槽位
                    int idx = (int) (tick & mask);
                    //删除被取消的队列,当我们删除任务时,任务就会进入取消队列
                    processCancelledTasks();
                    HashedWheelBucket bucket =
                            wheel[idx];
                    //循环将队列中的任务放入槽内的延迟队列,每次最多转10000个,防止占CPU过多时间
                    transferTimeoutsToBuckets();
                     //更新延迟队列中的任务时间
                    bucket.expireTimeouts(deadline);
                    //更新tick
                    tick++;
                }
            } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
            
             //stop停止后清理任务
            // Fill the unprocessedTimeouts so we can return them from stop() method.
            for (HashedWheelBucket bucket: wheel) {
                bucket.clearTimeouts(unprocessedTimeouts);
            }
            for (;;) {
                HashedWheelTimeout timeout = timeouts.poll();
                if (timeout == null) {
                    break;
                }
                if (!timeout.isCancelled()) {
                    unprocessedTimeouts.add(timeout);
                }
            }
            processCancelledTasks();
        }

这里我们再看下expireTimeouts方法

public void expireTimeouts(long deadline) {
            HashedWheelTimeout timeout = head;

            // process all timeouts
            //遍历延迟队列,处理所有可执行任务,时间没到的轮次-1,否则执行
            while (timeout != null) {
                HashedWheelTimeout next = timeout.next;
                if (timeout.remainingRounds <= 0) {//轮次<=0,删除并执行
                    next = remove(timeout);
                    if (timeout.deadline <= deadline) {
                        timeout.expire(); //会调用任务的run方法
                    } else {
                        // The timeout was placed into a wrong slot. This should never happen.
                        throw new IllegalStateException(String.format(
                                "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
                    }
                } else if (timeout.isCancelled()) {
                    next = remove(timeout);
                } else {
                    timeout.remainingRounds --;
                }
                timeout = next;
            }
        }

总结

netty这里使用轮次时间轮,而不是更复杂的多层时间轮。每次新增任务时是先放到MSPC队列中,然后一个worker线程消费MSPC队列(不用考虑并发问题),时间每到一个槽位,遍历槽位的延迟队列,轮次小于等于0就代表本次要执行了。
netty时间轮设计还是比较简单的,因为新增任务使用了MSPC队列,时间复杂度算是O(1),单执行任务时只有一个work线程,任务多时执行较慢,而且会受之前的任务执行时间影响。

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。