模拟kafka时间轮

延迟功能调度器

public class SystemTimer {
    // 执行任务线程池
    private ExecutorService taskExecutor = Executors.newFixedThreadPool(1, runnable -> {
        Thread thread = new Thread(runnable, "executor-pool");
        thread.setDaemon(true);
        thread.setUncaughtExceptionHandler((t, e) -> System.err.println("Uncaught exception in thread '" + t.getName() + "':" + e));
        return thread;
    });
    // 延迟队列
    private DelayQueue<TimerTaskList> delayQueue = new DelayQueue();
    // 时间轮
    private TimingWheel timingWheel = new TimingWheel(1000L, 5, System.currentTimeMillis(), delayQueue);

    private static SystemTimer INSTANCE;

    public static SystemTimer getInstance() {
        if (INSTANCE == null) {
            synchronized (SystemTimer.class) {
                if (INSTANCE == null) {
                    INSTANCE = new SystemTimer();
                }
            }
        }
        return INSTANCE;
    }

    private SystemTimer() {
        new Thread(() -> {
            while (true){
                advanceClock(1000L);
            }
        }).start();
    }

    /**
     * 推动时间轮转动
     * @param timeoutMs
     */
    private void advanceClock(Long timeoutMs) {
        try {
            TimerTaskList bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS);
            if (bucket != null){
                timingWheel.advanceClock(bucket.getExpiration());
                bucket.flush(this::addTask);
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }

    /**
     * 添加任务
     * @param timedTask
     */
    public void addTask(TimerTaskEntry timedTask) {
        if (!timingWheel.addTask(timedTask)) {// 到期或取消
            if (!timedTask.isCancle()) {// 到期立即执行
                taskExecutor.submit(timedTask.getTask());
            }
        }
    }
}

时间轮

public class TimingWheel {
    // 一个槽表示的时间范围
    private Long tickMs;
    // 轮大小
    private Integer wheelSize;
    // 一个时间轮表示的时间范围
    private Long interval;
    // 时间轮指针
    private volatile long currentTime;
    private TimerTaskList[] buckets;
    private DelayQueue<TimerTaskList> delayQueue;
    // 上层时间轮
    private volatile TimingWheel overflowWheel;

    public TimingWheel(Long tickMs, Integer wheelSize, Long currentTime, DelayQueue<TimerTaskList> delayQueue) {
        this.tickMs = tickMs;
        this.wheelSize = wheelSize;
        this.interval = tickMs * wheelSize;
        this.buckets = new TimerTaskList[wheelSize];
        this.currentTime = currentTime - (currentTime % tickMs);
        this.delayQueue = delayQueue;
        for (int i = 0; i < buckets.length; i++) {
            buckets[i] = new TimerTaskList(UUID.randomUUID().toString());
        }
    }

    /**
     * 添加任务到对应的槽
     * @param timedTask
     * @return
     */
    public boolean addTask(TimerTaskEntry timedTask) {
        Long expirationMs = timedTask.getExpirationMs();
        if(timedTask.isCancle()){// 任务取消
            return false;
        }
        long delayMs = expirationMs - currentTime;
        if(delayMs < tickMs){// 任务到期
            return false;
        }else if(delayMs < interval){// 添加到对应的槽
            long virtualId = expirationMs / tickMs;
            int bucketIndex = (int) (virtualId % wheelSize);
            TimerTaskList bucket = buckets[bucketIndex];
            bucket.addTask(timedTask);
            // 设置槽过期时间并将任务入队
            if(bucket.setExpiration(expirationMs - (expirationMs % tickMs))){
                delayQueue.offer(bucket);
            }
            return true;
        }else{// 添加到上层时间轮
            if(overflowWheel == null){
                addOverflowWheel();
            }
            overflowWheel.addTask(timedTask);
            return true;
        }
    }

    /**
     * 尝试推荐时间轮
     * @param expiration
     */
    public void advanceClock(Long expiration) {
        if(expiration >= currentTime + tickMs){
            currentTime = expiration - (expiration % tickMs);
            if(overflowWheel != null){
                overflowWheel.advanceClock(expiration);
            }
        }
    }

    private TimingWheel addOverflowWheel() {
        if (overflowWheel == null) {
            synchronized (this) {
                if (overflowWheel == null) {
                    // 注意这里第一个参数为interval
                    overflowWheel = new TimingWheel(interval, wheelSize, currentTime, delayQueue);
                }
            }
        }
        return overflowWheel;
    }
}

槽(任务列表)

public class TimerTaskList implements Delayed {
    // 唯一标识
    private String id;
    // 槽过期时间
    private AtomicLong expiration = new AtomicLong(-1L);

    private TimerTaskEntry root = new TimerTaskEntry(-1L,null);

    {
        root.next = root;
        root.prev = root;
    }

    public TimerTaskList(String id) {
        this.id = id;
    }

    /**
     * 添加任务
     * @param timedTask
     */
    public void addTask(TimerTaskEntry timedTask) {
        synchronized (this) {
            timedTask.bucket = this;
            TimerTaskEntry tail = root.prev;

            timedTask.next = root;
            timedTask.prev = tail;

            tail.next = timedTask;
            root.prev = timedTask;
        }
    }

    /**
     * 删除任务
     * @param timedTask
     */
    public void removeTask(TimerTaskEntry timedTask) {
        synchronized (this) {
            if (timedTask.bucket.id.equals(this.id)) {
                timedTask.next.prev = timedTask.prev;
                timedTask.prev.next = timedTask.next;
                timedTask.bucket = null;
                timedTask.next = null;
                timedTask.prev = null;
            }
        }
    }

    /**
     * 重新分配槽
     * 执行当前槽任务时会调用该方法
     * @param timedTaskFlush
     */
    public void flush(Consumer<TimerTaskEntry> timedTaskFlush) {
        synchronized (this){
            TimerTaskEntry timedTask = root.next;
            while (timedTask != null && !timedTask.equals(root)){
                removeTask(timedTask);
                timedTaskFlush.accept(timedTask);
                timedTask = root.next;
            }
            expiration.set(-1);
        }
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return Math.max(0, unit.convert(expiration.get() - System.currentTimeMillis(), TimeUnit.MILLISECONDS));
    }

    @Override
    public int compareTo(Delayed o) {
        if (o instanceof TimerTaskList) {
            return Long.compare(expiration.get(), ((TimerTaskList) o).expiration.get());
        }
        return 0;
    }

    public Boolean setExpiration(Long expiration) {
        return this.expiration.getAndSet(expiration) != expiration;
    }

    public Long getExpiration() {
        return expiration.get();
    }
}

任务

public class TimerTaskEntry  {

    // 延迟时间
    private Long delayMs;
    // 任务
    private Runnable task;
    // 过期时间戳
    private Long expirationMs;
    private AtomicBoolean cancel;
    protected TimerTaskEntry next;
    protected TimerTaskEntry prev;
    protected TimerTaskList bucket;

    public TimerTaskEntry(Long delayMs, Runnable task) {
        this.delayMs = delayMs;
        this.task = task;
        this.expirationMs = System.currentTimeMillis() + delayMs;
        this.cancel = new AtomicBoolean(false);
        this.next = this.prev = null;
        this.bucket = null;
    }

    public boolean isCancle() {
        return cancel.get();
    }

    public boolean cancel(){
        return cancel.compareAndSet(false,true);
    }

    public Runnable getTask() {
        return task;
    }

    public Long getExpirationMs() {
        return expirationMs;
    }

}

测试

public class Test {

    public static void main(String[] args) throws InterruptedException {
        SystemTimer systemTimer = SystemTimer.getInstance();

        TimerTaskEntry taskEntry1 = new TimerTaskEntry(1000L, () -> System.out.println("任务执行1->" + System.currentTimeMillis()));
        TimerTaskEntry taskEntry2 = new TimerTaskEntry(2000L, () -> System.out.println("任务执行2->" + System.currentTimeMillis()));
        TimerTaskEntry taskEntry3 = new TimerTaskEntry(3000L, () -> System.out.println("任务执行3->" + System.currentTimeMillis()));
        TimerTaskEntry taskEntry4 = new TimerTaskEntry(4000L, () -> System.out.println("任务执行4->" + System.currentTimeMillis()));
        TimerTaskEntry taskEntry5 = new TimerTaskEntry(5000L, () -> System.out.println("任务执行5->" + System.currentTimeMillis()));
        TimerTaskEntry taskEntry6 = new TimerTaskEntry(6000L, () -> System.out.println("任务执行6->" + System.currentTimeMillis()));
        TimerTaskEntry taskEntry7 = new TimerTaskEntry(10000L, () -> System.out.println("任务执行7->" + System.currentTimeMillis()));
        TimerTaskEntry taskEntry8 = new TimerTaskEntry(20000L, () -> System.out.println("任务执行8->" + System.currentTimeMillis()));


        systemTimer.addTask(taskEntry1);
        systemTimer.addTask(taskEntry2);
        systemTimer.addTask(taskEntry3);
        systemTimer.addTask(taskEntry4);
        systemTimer.addTask(taskEntry5);
        systemTimer.addTask(taskEntry6);
        systemTimer.addTask(taskEntry7);
        systemTimer.addTask(taskEntry8);

        Thread.sleep(20000);

        new Thread(() -> {
            TimerTaskEntry taskEntry9 = new TimerTaskEntry(1500L, () -> System.out.println("任务执行9->" + System.currentTimeMillis()));
            systemTimer.addTask(taskEntry9);
        }).start();
    }
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,490评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,581评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 165,830评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,957评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,974评论 6 393
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,754评论 1 307
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,464评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,357评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,847评论 1 317
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,995评论 3 338
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,137评论 1 351
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,819评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,482评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,023评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,149评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,409评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,086评论 2 355

推荐阅读更多精彩内容

  • [TOC]在kafka中,有许多请求并不是立即返回,而且处理完一些异步操作或者等待某些条件达成后才返回,这些请求一...
    tracy_668阅读 2,209评论 0 1
  • 零、时间轮定义 简单说说时间轮吧,它是一个高效的延时队列,或者说定时器。实际上现在网上对于时间轮算法的解释很多,定...
    Java大生阅读 1,953评论 1 0
  • 时间轮:高效延时队列(定时器)。 Kafka中时间轮(TimingWheel)存储定时任务环形队列,底层数组实现,...
    hedgehog1112阅读 1,228评论 0 8
  • 时间轮 Kafka中存在大量的延迟操作,比如延迟生产,延迟拉取,延迟加入,延迟心跳等。kafka使用时间轮(Tim...
    绍圣阅读 1,300评论 0 0
  • 久违的晴天,家长会。 家长大会开好到教室时,离放学已经没多少时间了。班主任说已经安排了三个家长分享经验。 放学铃声...
    飘雪儿5阅读 7,523评论 16 22