取号队列顾名思义是取号排队系统的实现,取号排队系统无论是现实空间还是计算机空间都是非常场景的一种模型,例如餐厅取号点餐,这里借用这个模型完美的可以在并发不加锁情况下解决异步发送数据包顺序化
取号:顾客可以不阻塞的获取号码和点餐,长连接发送数据可以不- 阻塞发送(会缓存一部分数据)
叫号:顾客由餐厅前台统一按照号码顺序叫号,长连接根据缓存数据序号顺序发送
package utakenumberqueue
import (
"context"
"fmt"
"git.umu.work/be/goframework/logger"
"time"
)
type EndNumberValue struct{}
// TakeNumberQueue 取号队列
type TakeNumberQueue struct {
offset int32
queue map[int32]interface{}
out chan interface{}
exit chan struct{}
timer *time.Timer
timeout time.Duration
}
func NewTaskNumberQueue(ctx context.Context, size int, timeout time.Duration) *TakeNumberQueue {
timer := time.NewTimer(timeout)
tq := &TakeNumberQueue{
offset: 0,
queue: make(map[int32]interface{}, 100),
out: make(chan interface{}, size),
exit: make(chan struct{}),
timer: timer,
timeout: timeout,
}
go func() {
for {
select {
case <-timer.C:
close(tq.exit)
err := tq.Close(ctx)
if err != nil {
logger.GetLogger(ctx).Warn(err.Error())
}
}
}
}()
return tq
}
func (t *TakeNumberQueue) checkIsEnd(ctx context.Context, value interface{}) bool {
select {
case <-t.exit:
return true
default:
_, ok := value.(EndNumberValue)
if ok {
return true
}
}
return false
}
// TakeNumber 取号
func (t *TakeNumberQueue) TakeNumber(ctx context.Context, number int32, value interface{}) error {
t.timer.Reset(t.timeout)
if number == t.offset {
isEnd := t.checkIsEnd(ctx, value)
if isEnd {
t.timer.Stop()
close(t.out)
return nil
}
t.out <- value
t.offset += 1
ok := true
// 自旋
var v interface{}
for ok {
v, ok = t.queue[t.offset]
if ok {
isEnd = t.checkIsEnd(ctx, value)
if isEnd {
logger.GetLogger(ctx).Info("task number queue is closed")
close(t.out)
return nil
}
t.out <- v
t.offset += 1
}
}
} else if number < t.offset {
logger.GetLogger(ctx).Debug(fmt.Sprintf("number %+v is less than offset %+v", number, t.offset))
isEnd := t.checkIsEnd(ctx, value)
if isEnd {
logger.GetLogger(ctx).Info("task number queue is closed")
close(t.out)
return nil
}
t.out <- value
} else {
t.queue[number] = value
}
return nil
}
// Call 叫号
func (t *TakeNumberQueue) Call(ctx context.Context) (<-chan interface{}, error) {
return t.out, nil
}
func (t *TakeNumberQueue) TakeEndNumber(ctx context.Context, number int32) error {
logger.GetLogger(ctx).Info(fmt.Sprintf("task number queue end number is %+v", number))
var end EndNumberValue = struct{}{}
err := t.TakeNumber(ctx, number, end)
if err != nil {
return err
}
return nil
}
// Close 关闭,退出queue
func (t *TakeNumberQueue) Close(ctx context.Context) error {
closeOutTimer := time.NewTimer(10 * time.Second)
select {
case _, ok := <-t.out:
if ok {
close(t.out)
}
case _, ok := <-t.exit:
if ok {
close(t.exit)
}
case <-closeOutTimer.C:
closeOutTimer.Stop()
}
return nil
}