手动实现一个时间轮

手动实现一个时间轮

// 包 timewheel 提供了一个简单的时间轮调度器的实现。
package timewheel

import (
    "container/list" // 引入 list 包来使用双向链表
    "context"        // 引入 context 包来进行上下文控制
    "time"           // 引入 time 包来处理时间相关的功能
)

// Task 接口定义了需要时间轮执行的任务,它必须实现 Execute 方法。
type Task interface {
    Execute()
}

// TimeNode 结构体表示时间轮上的一个节点。
type TimeNode struct {
    task    Task      // 任务实例
    excTime time.Time // 预定的执行时间
    cycle   int       // 时间轮需要转动多少圈后执行任务
}

// TimeWheel 结构体表示时间轮。
type TimeWheel struct {
    startTime  time.Time     // 时间轮的启动时间
    interval   time.Duration // 时间轮的时间间隔
    currentPos int           // 当前的槽位置
    ticker     *time.Ticker  // 定时触发器
    slots      []*list.List  // 时间轮的槽数组,每个槽点是一个链表
    tasks      chan Task     // 任务channel,用于执行任务
    stop       chan struct{} // 停止channel,用于停止时间轮
    cancel     context.CancelFunc // 上下文取消函数
}

// NewTimeWheel 创建并初始化一个时间轮实例。
func NewTimeWheel(interval time.Duration, slotNum int) *TimeWheel {
    slots := make([]*list.List, slotNum)
    for i := range slots {
        slots[i] = list.New()
    }
    return &TimeWheel{
        startTime:  time.Now().UTC(),
        interval:   interval,
        currentPos: 0,
        ticker:     time.NewTicker(interval),
        slots:      slots,
        tasks:      make(chan Task, 1000),
        stop:       make(chan struct{}),
    }
}

// DefaultTimeWheel 创建一个默认配置的时间轮实例。
func DefaultTimeWheel() *TimeWheel {
    return NewTimeWheel(time.Second, 60)
}

// Start 启动时间轮,使其开始调度任务。
func (t *TimeWheel) Start() {
    ctx := context.Background()
    ctx, cancelFunc := context.WithCancel(ctx)
    t.cancel = cancelFunc
    go t.Loop(ctx)
    go func() {
        for {
            select {
            case task := <-t.tasks: // 当任务channel中有任务时,执行任务
                task.Execute()
            case <-t.stop: // 当接收到停止信号时,停止定时器并退出
                t.ticker.Stop()
                return
            }
        }

    }()
}

// Stop 停止时间轮,取消所有调度。
func (t *TimeWheel) Stop() {
    t.stop <- struct{}{} // 发送停止信号
    t.cancel()           // 调用上下文的取消函数
}

// Loop 是时间轮的核心调度循环。
func (t *TimeWheel) Loop(ctx context.Context) {
    for {
        select {
        case <-ctx.Done(): // 当上下文被取消时退出循环
            return
        case <-t.ticker.C: // 定时器触发时执行
            t.currentPos = (t.currentPos + 1) % len(t.slots) // 计算当前槽位置
            l := t.slots[t.currentPos]
            for e := l.Front(); e != nil; {
                node := e.Value.(*TimeNode)
                if node.cycle > 0 {
                    node.cycle-- // 减少圈数
                    e = e.Next()
                    continue
                }
                t.tasks <- node.task // 将任务发送到任务channel
                next := e.Next()
                l.Remove(e) // 移除已执行的任务
                e = next
            }

        }
    }
}

// AddTask 添加任务到时间轮。
func (t *TimeWheel) AddTask(task Task, delay time.Duration) {
    pos := (t.currentPos + int(delay/t.interval)) % len(t.slots) // 计算任务应该放入的槽位置
    cycle := int(delay/t.interval) / len(t.slots) // 计算任务需要等待的圈数

    if t.slots[pos] == nil {
        t.slots[pos] = list.New()
    }
    t.slots[pos].PushBack(
        &TimeNode{
            task:    task,
            excTime: time.Now().Add(delay),
            cycle:   cycle,
        },
    )
}

说明文档

1. 概述

时间轮(TimeWheel)是一个用于任务调度的数据结构,它允许你以固定的时间间隔调度任务。这段Go代码提供了一个简单的时间轮实现。

2. 结构体和接口

  • Task: 一个接口,所有希望被时间轮调度的任务都应该实现这个接口的 Execute 方法。
  • TimeNode: 代表时间轮上的节点,持有一个任务和任务的执行时间以及剩余圈数。
  • TimeWheel: 时间轮的主体,包含了时间轮的各项配置,如时间间隔、槽数组等,并且可以开始和停止任务调度。

3. 方法

  • NewTimeWheel: 创建并返回一个新的时间轮实例。
  • DefaultTimeWheel: 创建一个拥有默认配置(一秒钟的间隔,60个槽)的时间轮实例。
  • Start: 启动时间轮,开始调度任务。
  • Stop: 停止时间轮,取消所有调度任务。
  • Loop: 作为时间轮的核心调度循环,负责移动槽位置,执行到期的任务。
  • AddTask: 向时间轮添加任务,可以指定延迟时间。

4. 使用方式

创建时间轮实例,通过 AddTask 方法添加任务,然后调用 Start 方法开始调度。当不再需要时间轮时,可以调用 Stop 方法来停止所有调度。

5. 注意事项

  • 时间轮初始化时会创建一个任务通道 tasks,其大小为1000,因此它可以在不阻塞的情况下缓存多达1000个待执行的任务。
  • 时间轮的 Loop 方法使用了Go的 select 语句来高效的处理任务的执行和时间轮的停止。
  • 在停止时间轮时,应确保所有的任务都已经完成或不需要执行,以避免资源泄露或未完成的任务。
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容