Golang无限缓存channel

需求

最近在弄一个游戏的gate网关转发服务器,服务器之间使用的是nats通讯,gate的作用是接收客户端发来的消息转发到对应的服务器上,并从nats上获取游戏服务器发送给客户端的消息并转发给客户端。前面接收还好处理,因为都是发布订阅模式的消息,收到消息直接向nats上扔就行了。但转发服务器来的消息就不一样了,从nats上取的速度远大于gate转发给客户端的速度,会有数据囤积在nats中。为了解决这个问题,可以一个协和去nats中取数据,用多个协程并行转发给客户端,因为现在cpu都是n核的,多协程转发肯定会快的不止一点点,这里要注意一点的是,同个玩家的消息转发的顺序不能变,就是按一定的规则把同一个玩家的所有消息在同一个协程上转发就可以了。
为了现实这个转发,需要一个无限缓存的channel,先装nats中的数据读出来分别放到各转发协程的channal中,转发协程只从自己的channel取数据一条一条慢慢转发给客户端就行了。

设计

先说一下设计无限缓存channel的大至思路,然后直接上完整代码。

分析

要达到前面的需求,我们需要设计的无限缓存channel应该满足几个要求:

  1. 缓存无限。因为不知道nats那边会接收到多少消息,而转发可能会因网络波动阻塞。
  2. 不能阻塞写。要保证接收nats消息的协程能及时处理所有的消息,并写入转发协程的channel中。
  3. 无数据时阻塞读,此特性保持和普通channle一样。没数据时转发协程处理阻塞等待。
  4. 读写都应通过channle操作,和普通channel的操作一样。
  5. channle被关闭后,未读取的数据应该仍然可读,此特性和普通channle保持一致。

针对上面的要求,设计如下:

  1. 因为go暂不支持操作符重载,所以封装一个结构体,包含二个channel来分离读(Out Channel)和写(In Channel)。
  2. 因为channel的缓存大小是有限的,需要一个可无限扩容缓存多于的数据,这里可以使用ringbuffer来实现。
  3. 无限缓存内部数据FIFO实现

当Out Channel还没有满时,并且buf中没有数据,读取In中数据,将其放入Out,直到Out满
当Buf中有数据时,无论Out是否满,都将将In中读到的数据,直接写入到Buf中,目的就是为了保证数据的FIFO原则

实现代码

完整工程参见:https://github.com/zngw/zchan

1、双向环形链表

package ringbuffer

import (
    "errors"
    "fmt"
    "sync/atomic"
)

type T interface{}

var ErrIsEmpty = errors.New("ringbuffer is empty")

type cell struct {
    Data     []T   // 数据部分
    fullFlag bool  // cell满的标志
    next     *cell // 指向后一个cellBuffer
    pre      *cell // 指向前一个cellBuffer

    r int // 下一个要读的指针
    w int // 下一个要下的指针
}

type RingBuffer struct {
    cellSize  int   // cell大小
    cellCount int   // cell数量
    count     int32 // 有效元素个数

    readCell  *cell // 下一个要读的cell
    writeCell *cell // 下一个要写的cell
}

// NewRingBuffer 新建一个RingBuffer,包含两个cell
func NewRingBuffer(cellSize int) (buf *RingBuffer, err error) {
    if cellSize <= 0 || cellSize&(cellSize-1) != 0 {
        err = fmt.Errorf("初始大小必须是 2 的幂")
        return
    }

    rootCell := &cell{
        Data: make([]T, cellSize),
    }
    lastCell := &cell{
        Data: make([]T, cellSize),
    }
    rootCell.pre = lastCell
    lastCell.pre = rootCell
    rootCell.next = lastCell
    lastCell.next = rootCell

    buf = &RingBuffer{
        cellSize:  cellSize,
        cellCount: 2,
        count:     0,
        readCell:  rootCell,
        writeCell: rootCell,
    }

    return
}

// Read 读取数据
func (r *RingBuffer) Read() (data T, err error) {
    // 无数据
    if r.IsEmpty() {
        err = ErrIsEmpty
        return
    }

    // 读取数据,并将读指针向右移动一位
    data = r.readCell.Data[r.readCell.r]
    r.readCell.r++
    atomic.AddInt32(&r.count, -1)

    // 此cell已经读完
    if r.readCell.r == r.cellSize {
        // 读指针归零,并将该cell状态置为非满
        r.readCell.r = 0
        r.readCell.fullFlag = false
        // 将readCell指向下一个cell
        r.readCell = r.readCell.next
    }

    return
}

// Pop 读一个元素,读完后移动指针
func (r *RingBuffer) Pop() (data T) {
    data, err := r.Read()
    if errors.Is(err, ErrIsEmpty) {
        panic(ErrIsEmpty.Error())
    }
    return
}

// Peek 窥视 读一个元素,仅读但不移动指针
func (r *RingBuffer) Peek() (data T) {
    if r.IsEmpty() {
        panic(ErrIsEmpty.Error())
    }

    // 仅读
    data = r.readCell.Data[r.readCell.r]
    return
}

// Write 写入数据
func (r *RingBuffer) Write(value T) {
    // 在 r.writeCell.w 位置写入数据,指针向右移动一位
    r.writeCell.Data[r.writeCell.w] = value
    r.writeCell.w++
    atomic.AddInt32(&r.count, 1)

    // 当前cell写满了
    if r.writeCell.w == r.cellSize {
        // 指针置0,将该cell标记为已满,并指向下一个cell
        r.writeCell.w = 0
        r.writeCell.fullFlag = true
        r.writeCell = r.writeCell.next
    }

    // 下一个cell也已满,扩容
    if r.writeCell.fullFlag == true {
        r.grow()
    }
}

// grow 扩容
func (r *RingBuffer) grow() {
    // 新建一个cell
    newCell := &cell{
        Data: make([]T, r.cellSize),
    }

    // 总共三个cell,writeCell,preCell,newCell
    // 本来关系: preCell <===> writeCell
    // 现在将newcell插入:preCell <===> newCell <===> writeCell
    pre := r.writeCell.pre
    pre.next = newCell
    newCell.pre = pre
    newCell.next = r.writeCell
    r.writeCell.pre = newCell

    // 将writeCell指向新建的cell
    r.writeCell = r.writeCell.pre

    // cell 数量加一
    r.cellCount++
}

// IsEmpty 判断RingBuffer是否为空
func (r *RingBuffer) IsEmpty() bool {
    return r.Len() == 0
}

// Capacity RingBuffer容量
func (r *RingBuffer) Capacity() int {
    return r.cellCount * r.cellSize
}

// Len RingBuffer数据长度
func (r *RingBuffer) Len() (count int) {
    count = int(r.count)
    return
}

// Reset 重置为仅指向两个cell的ring
func (r *RingBuffer) Reset() {
    // 没有数据切cellCount只有两个时,无需重置
    if r.count == 0 && r.cellCount == 2 {
        return
    }

    lastCell := r.readCell.next

    lastCell.w = 0
    lastCell.r = 0
    r.readCell.r = 0
    r.readCell.w = 0
    r.cellCount = 2
    r.count = 0

    lastCell.next = r.readCell
}

2. 无限缓存channel

package zchan

import (
    "github.com/zngw/ringbuffer"
)

type T interface{}

type ZChan struct {
    In     chan<- T               // 写入channel
    Out    <-chan T               // 读取channel
    buffer *ringbuffer.RingBuffer // 双向环形链表
}

// Len uc中总共的元素数量
func (uc *ZChan) Len() int {
    return len(uc.In) + uc.BufLen() + len(uc.Out)
}

// BufLen uc的buf中的元素数量
func (uc *ZChan) BufLen() int {
    return uc.buffer.Len()
}

// New 新建一个无限缓存的Channel,并指定In和Out大小(In和Out设置得一样大)
func New(initCapacity int) (ch *ZChan, err error) {
    rb, err := ringbuffer.NewRingBuffer(512)
    if err != nil {
        return
    }

    in := make(chan T, initCapacity)
    out := make(chan T, initCapacity)
    ch = &ZChan{In: in, Out: out, buffer: rb}

    go process(in, out, ch)

    return
}

// 内部Worker Goroutine实现
func process(in, out chan T, ch *ZChan) {
    defer close(out) // in 关闭,数据读取后也把out关闭

    // 不断从in中读取数据放入到out或者buf中
loop:
    for {
        // 第一步:从in中读取数据
        value, ok := <-in
        if !ok {
            // in 关闭了,退出loop
            break loop
        }

        // 第二步:将数据存储到out或者buf中
        if ch.buffer.Len() > 0 {
            // 当buf中有数据时,新数据优先存放到buf中,确保数据FIFO原则
            ch.buffer.Write(value)

        } else {
            // out 没有满,数据放入out中
            select {
            case out <- value:
                continue
            default:
            }

            // out 满了,数据放入buf中
            ch.buffer.Write(value)
        }

        // 第三步:处理buf,一直尝试把buf中的数据放入到out中,直到buf中没有数据
        for ch.buffer.Len() > 0 {
            select {
            // 为了避免阻塞in,还要尝试从in中读取数据
            case val, ok := <-in:
                if !ok {
                    // in 关闭了,退出loop
                    break loop
                }
                // 因为这个时候out是满的,新数据直接放入buf中
                ch.buffer.Write(val)

            // 将buf中数据放入out
            case out <- ch.buffer.Peek():
                ch.buffer.Pop()

                if ch.buffer.IsEmpty() {
                    ch.buffer.Reset()
                }
            }
        }
    }

    // in被关闭退出loop后,buf中还有可能有未处理的数据,将他们塞入out中,并重置buf
    for ch.buffer.Len() > 0 {
        out <- ch.buffer.Pop()
    }
}

3. 使用测试

package main

import (
    "fmt"
    "github.com/zngw/zchan"
    "time"
)

func main() {
    zc, err := zchan.New(4)
    if err != nil {
        panic(err.Error())
    }

    go func() {
        // 写入channel数据
        // 10毫秒写入1次
        for i := 0; i < 55; i++ {
            zc.In <- i
            fmt.Printf("写入数据:%v, chan长度:%v, Buf长度: %v \n", i, zc.Len(), zc.BufLen())
            time.Sleep(10 * time.Millisecond)
        }

        close(zc.In)
    }()

    for v := range zc.Out {
        // 20 毫毛读取一次数据
        fmt.Printf("读取入数据:%v, chan长度:%v, Buf长度:%v \n", v, zc.Len(), zc.BufLen())
        time.Sleep(20 * time.Millisecond)
    }
}

参考文献:
[1] https://blog.csdn.net/qq_39382769/article/details/122423070
[2] https://colobu.com/2021/05/11/unbounded-channel-in-go/

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,919评论 6 502
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,567评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,316评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,294评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,318评论 6 390
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,245评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,120评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,964评论 0 275
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,376评论 1 313
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,592评论 2 333
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,764评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,460评论 5 344
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,070评论 3 327
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,697评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,846评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,819评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,665评论 2 354

推荐阅读更多精彩内容