取号队列模型解决长连接顺序发送问题

取号队列顾名思义是取号排队系统的实现,取号排队系统无论是现实空间还是计算机空间都是非常场景的一种模型,例如餐厅取号点餐,这里借用这个模型完美的可以在并发不加锁情况下解决异步发送数据包顺序化

  • 取号:顾客可以不阻塞的获取号码和点餐,长连接发送数据可以不- 阻塞发送(会缓存一部分数据)

  • 叫号:顾客由餐厅前台统一按照号码顺序叫号,长连接根据缓存数据序号顺序发送

package utakenumberqueue

import (
    "context"
    "fmt"
    "git.umu.work/be/goframework/logger"
    "time"
)

type EndNumberValue struct{}

// TakeNumberQueue 取号队列
type TakeNumberQueue struct {
    offset  int32
    queue   map[int32]interface{}
    out     chan interface{}
    exit    chan struct{}
    timer   *time.Timer
    timeout time.Duration
}

func NewTaskNumberQueue(ctx context.Context, size int, timeout time.Duration) *TakeNumberQueue {
    timer := time.NewTimer(timeout)
    tq := &TakeNumberQueue{
        offset:  0,
        queue:   make(map[int32]interface{}, 100),
        out:     make(chan interface{}, size),
        exit:    make(chan struct{}),
        timer:   timer,
        timeout: timeout,
    }
    go func() {
        for {
            select {
            case <-timer.C:
                close(tq.exit)
                err := tq.Close(ctx)
                if err != nil {
                    logger.GetLogger(ctx).Warn(err.Error())
                }
            }
        }
    }()

    return tq
}

func (t *TakeNumberQueue) checkIsEnd(ctx context.Context, value interface{}) bool {
    select {
    case <-t.exit:
        return true
    default:
        _, ok := value.(EndNumberValue)
        if ok {
            return true
        }
    }

    return false
}

// TakeNumber 取号
func (t *TakeNumberQueue) TakeNumber(ctx context.Context, number int32, value interface{}) error {
    t.timer.Reset(t.timeout)
    if number == t.offset {
        isEnd := t.checkIsEnd(ctx, value)
        if isEnd {
            t.timer.Stop()
            close(t.out)
            return nil
        }
        t.out <- value
        t.offset += 1
        ok := true
        // 自旋
        var v interface{}
        for ok {
            v, ok = t.queue[t.offset]
            if ok {
                isEnd = t.checkIsEnd(ctx, value)
                if isEnd {
                    logger.GetLogger(ctx).Info("task number queue is closed")
                    close(t.out)
                    return nil
                }
                t.out <- v
                t.offset += 1
            }
        }
    } else if number < t.offset {
        logger.GetLogger(ctx).Debug(fmt.Sprintf("number %+v is less than offset %+v", number, t.offset))
        isEnd := t.checkIsEnd(ctx, value)
        if isEnd {
            logger.GetLogger(ctx).Info("task number queue is closed")
            close(t.out)
            return nil
        }
        t.out <- value
    } else {
        t.queue[number] = value
    }

    return nil
}

// Call 叫号
func (t *TakeNumberQueue) Call(ctx context.Context) (<-chan interface{}, error) {
    return t.out, nil
}

func (t *TakeNumberQueue) TakeEndNumber(ctx context.Context, number int32) error {
    logger.GetLogger(ctx).Info(fmt.Sprintf("task number queue end number is %+v", number))
    var end EndNumberValue = struct{}{}
    err := t.TakeNumber(ctx, number, end)
    if err != nil {
        return err
    }

    return nil
}

// Close 关闭,退出queue
func (t *TakeNumberQueue) Close(ctx context.Context) error {
    closeOutTimer := time.NewTimer(10 * time.Second)
    select {
    case _, ok := <-t.out:
        if ok {
            close(t.out)
        }
    case _, ok := <-t.exit:
        if ok {
            close(t.exit)
        }
    case <-closeOutTimer.C:
        closeOutTimer.Stop()
    }

    return nil
}

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容