【netty学习笔记十八】netty时间轮

我们知道JDK原生的Timer、ScheduledThreadPoolExecutor都可以处理定时任务。但它们都是基于小顶堆/大顶堆,因此添加任务和删除任务的时间复杂度都是 O(Log2n) 。在任务数量很大的情况下,性能表现比较差。
在论文《Hashed and Hierarchical Timing Wheels》中有一种时间轮的数据结构,在处理延迟任务时,时间复杂度可降低到O(1)。
这个盗个图:


image.png

顾名思义,时间轮和钟表一样。每个时间格子称之为槽位,代表时间单位,如1秒。拥有的槽位个数就是最大任务个数,每个槽位存放延迟任务队列。当一个延迟任务插入时,先计算延迟时间与单位时间的余值,看该任务要插入哪个槽内的延迟队列。时间轮每过一个单位时间,指针指向下一个槽位,该槽位内的任务队列全部触发。
当然了,这种基本的时间轮有诸多限制,如只能支持槽位个数的任务数、时间跨度太小。解决方案有:

  1. 不同轮次的延迟任务共存相同的延迟队列
    一个60个槽位的时间轮,每个槽位代表1s,那么这个时间轮只能表示1分钟的任务,如21:00,那21:01呢,当然可以复用这个时间轮,那就要引入轮次的概念。如果从21:00开始算,那21:01是第二轮,依次类推。延迟时间对时间轮长度求余即要插入的槽位。这样插入的时间复杂度仍然是O(0),获取是否有任务时间复杂度变为O(n)。当然可以用小顶堆来操作,但这样时间插入复杂度又会上升到O(log2n)
  2. 多层次时间轮
    如一天的时间可以用三级时间轮:小时时间轮(24),分钟时间轮(60),秒时间轮(60),秒时间轮走一圈,分钟时间轮走一格;分钟时间轮走一圈,小时时间轮走一格。

netty时间轮HashedWheelTimer实现

HashedWheelTimer是采用轮次方案
构造方法:

public HashedWheelTimer(
            ThreadFactory threadFactory,
            long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
            long maxPendingTimeouts) {

        //省略
        // Normalize ticksPerWheel to power of two and initialize the wheel.
        wheel = createWheel(ticksPerWheel);
        //省略
        workerThread = threadFactory.newThread(worker);

         //省略
    }
//创建循环数组
private static HashedWheelBucket[] createWheel(int ticksPerWheel) {
        //略
        //设置大小为最近的2的n次方
        ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);
        HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
        for (int i = 0; i < wheel.length; i ++) {
            //每个槽位初始化为一个双向链表
            wheel[i] = new HashedWheelBucket();
        }
        return wheel;
    }

新增延时任务newTimeout

@Override
    public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
        ObjectUtil.checkNotNull(task, "task");
        ObjectUtil.checkNotNull(unit, "unit");
        //任务计数
        long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
        //判断任务数是否超过最大限制,默认maxPendingTimeouts =-1,无限制
        if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
            pendingTimeouts.decrementAndGet();
            throw new RejectedExecutionException("Number of pending timeouts ("
                + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
                + "timeouts (" + maxPendingTimeouts + ")");
        }
        //开启任务工作线程worker
        start();

        // Add the timeout to the timeout queue which will be processed on the next tick.
        // During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket.
        long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;

        // Guard against overflow.
        if (delay > 0 && deadline < 0) {
            deadline = Long.MAX_VALUE;
        }
        //封装任务
        HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
        //放入Mpsc队列timesout
        timeouts.add(timeout);
        return timeout;
    }

public void start() {
        //只有一个线程能执行workerThread.start()
        switch (WORKER_STATE_UPDATER.get(this)) {
            case WORKER_STATE_INIT:
                if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
                    workerThread.start();
                }
                break;
            case WORKER_STATE_STARTED:
                break;
            case WORKER_STATE_SHUTDOWN:
                throw new IllegalStateException("cannot be started once stopped");
            default:
                throw new Error("Invalid WorkerState");
        }

        // Wait until the startTime is initialized by the worker.
        while (startTime == 0) {
            try {
                startTimeInitialized.await();
            } catch (InterruptedException ignore) {
                // Ignore - it will be ready very soon.
            }
        }
    }

新增任务时,不是直接放到时间轮的延迟队列中,而是放到Mpsc队列。然后由单线程worker提取任务。继续看下worker线程的run方法:

public void run() {
            // Initialize the startTime.
            startTime = System.nanoTime();
            if (startTime == 0) {
                // We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized.  1表示初始化
                startTime = 1;
            }

            // Notify the other threads waiting for the initialization at start().
            startTimeInitialized.countDown();

            do {
                //等待指针到下一个tick,指针移动一次为一个tick
                final long deadline = waitForNextTick();
                 //deadline大于0说明有任务
                if (deadline > 0) {
                    //确定在哪个槽位
                    int idx = (int) (tick & mask);
                    //删除被取消的队列,当我们删除任务时,任务就会进入取消队列
                    processCancelledTasks();
                    HashedWheelBucket bucket =
                            wheel[idx];
                    //循环将队列中的任务放入槽内的延迟队列,每次最多转10000个,防止占CPU过多时间
                    transferTimeoutsToBuckets();
                     //更新延迟队列中的任务时间
                    bucket.expireTimeouts(deadline);
                    //更新tick
                    tick++;
                }
            } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
            
             //stop停止后清理任务
            // Fill the unprocessedTimeouts so we can return them from stop() method.
            for (HashedWheelBucket bucket: wheel) {
                bucket.clearTimeouts(unprocessedTimeouts);
            }
            for (;;) {
                HashedWheelTimeout timeout = timeouts.poll();
                if (timeout == null) {
                    break;
                }
                if (!timeout.isCancelled()) {
                    unprocessedTimeouts.add(timeout);
                }
            }
            processCancelledTasks();
        }

这里我们再看下expireTimeouts方法

public void expireTimeouts(long deadline) {
            HashedWheelTimeout timeout = head;

            // process all timeouts
            //遍历延迟队列,处理所有可执行任务,时间没到的轮次-1,否则执行
            while (timeout != null) {
                HashedWheelTimeout next = timeout.next;
                if (timeout.remainingRounds <= 0) {//轮次<=0,删除并执行
                    next = remove(timeout);
                    if (timeout.deadline <= deadline) {
                        timeout.expire(); //会调用任务的run方法
                    } else {
                        // The timeout was placed into a wrong slot. This should never happen.
                        throw new IllegalStateException(String.format(
                                "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
                    }
                } else if (timeout.isCancelled()) {
                    next = remove(timeout);
                } else {
                    timeout.remainingRounds --;
                }
                timeout = next;
            }
        }

总结

netty这里使用轮次时间轮,而不是更复杂的多层时间轮。每次新增任务时是先放到MSPC队列中,然后一个worker线程消费MSPC队列(不用考虑并发问题),时间每到一个槽位,遍历槽位的延迟队列,轮次小于等于0就代表本次要执行了。
netty时间轮设计还是比较简单的,因为新增任务使用了MSPC队列,时间复杂度算是O(1),单执行任务时只有一个work线程,任务多时执行较慢,而且会受之前的任务执行时间影响。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,992评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,212评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,535评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,197评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,310评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,383评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,409评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,191评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,621评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,910评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,084评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,763评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,403评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,083评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,318评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,946评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,967评论 2 351