Golang CSP模型基础库

Go语言的哲学思想之一为:Do not communicate by sharing memory; instead, share memory by communicating. 翻译成中文即是:不要通过共享内存进行通信;取而代之,应该通过通信来共享内存。
为了支持这种哲学,Go语言提供了channel(通道)。
想当初,我刚看到这种哲学的时候觉得特别好,但是在具体实现代码的时候,对其理解却不是很深刻。在项目中,我总是把channel当作生产者-消费者模型中的消息队列来进行使用。这样对于在不同的goroutine中传输数据是很方便的。
但是对于共享内存数据,我一直使用的是传统的锁方式,而在Go语言中,常用的就是Mutex和RWMutex。对于从C/C++/Java/C#转型到Go语言的程序员而言,使用Mutex/RWMutex是非常直观的。我也尝试过使用channel来共享数据,但是觉得代码非常地繁琐,于是就放弃了。
但是随着在越来越多的项目中使用Go,以及越来越多的同事使用Go;在其它语言中使用锁的常见问题也在Go里面暴露出来了。于是,我转而继续思想使用channel来实现内存数据的共享。
最近在使用Erlang,我从中得到不少的灵感。因为Erlang是一门只提供通过发送消息来共享内存的语言,而没有提供传统的锁方式。在Erlang中,OTP框架中提供了一个叫做gen_server的behaviour,而这可以大大地简化逻辑的实现。
于是,我使用Go语言的channel实现了一个类似的版本。
代码如下:
baseModel.go

/*
This package is designed to simplify the use of csp model.
In csp model, we use channel to communicate among goroutines.
But it's not easy and error prone to implement the logic every time.
So, this package is designed as a template to use.
*/
package cspBaseModel

import (
    "context"
    "fmt"
)

// Request type is defined as the message exchanged between higher layer and the base model.
type Request struct {
    Name      string      // The identity of a request. Which is used to find the callback function.
    Parameter interface{} // The data sent to the callback function.
    ReturnCh  interface{} // A channel used to pass message between the request method and the callback function.
}

func NewRequest(name string, parameter, returnCh interface{}) *Request {
    return &Request{
        Name:      name,
        Parameter: parameter,
        ReturnCh:  returnCh,
    }
}

// BaseModel type is the core type
type BaseModel struct {
    callbackMap    map[string]func(*Request) // This callbackMap stores all the name and callback function pairs.
    RequestChannel chan *Request             // This channel is used to receive all the request from higher layer functions.
}

// Higher layer modules can register name and callback function pairs to base model.
func (this *BaseModel) Register(name string, callback func(*Request)) {
    if _, exists := this.callbackMap[name]; exists {
        panic(fmt.Sprintf("Item with name:%s has already existed.", name))
    }
    this.callbackMap[name] = callback
}

// Higher layer modules start base model to handle all the requests.
// ctx is used to stop this goroutine, to avoid goroutine leak.
func (this *BaseModel) Start(ctx context.Context) {
    go func() {
        for {
            select {
            case requestObj := <-this.RequestChannel:
                if callback, exists := this.callbackMap[requestObj.Name]; !exists {
                    panic(fmt.Sprintf("There is no callback related to %s", requestObj.Name))
                } else {
                    callback(requestObj)
                }
            case <-ctx.Done():
                return
            }
        }
    }()
}

func NewBaseModel() *BaseModel {
    return &BaseModel{
        callbackMap:    make(map[string]func(*Request), 16),
        RequestChannel: make(chan *Request, 64),
    }
}

baseModel_test.go

package cspBaseModel

import (
    "context"
    "fmt"
    "sync"
    "testing"
)

type Player struct {
    Id    int
    Name  string
    Level int
}

func (this *Player) Equal(other *Player) bool {
    return this.Id == other.Id && this.Name == other.Name && this.Level == other.Level
}

func (this *Player) String() string {
    return fmt.Sprintf("{Id:%d, Name:%s, Level:%d}", this.Id, this.Name, this.Level)
}

func NewPlayer(id int, name string, level int) *Player {
    return &Player{
        Id:    id,
        Name:  name,
        Level: level,
    }
}

// Use mutex as the way to prevent concurrency
type MutexPlayer struct {
    mutex     sync.RWMutex
    playerMap map[int]*Player
}

func (this *MutexPlayer) GetPlayerById(id int) (*Player, bool) {
    this.mutex.RLock()
    defer this.mutex.RUnlock()
    playerObj, exists := this.playerMap[id]
    return playerObj, exists
}

func (this *MutexPlayer) GetPlayerList() []*Player {
    this.mutex.RLock()
    defer this.mutex.RUnlock()
    playerList := make([]*Player, 0, len(this.playerMap))
    for _, value := range this.playerMap {
        playerList = append(playerList, value)
    }
    return playerList
}

func (this *MutexPlayer) AddPlayer(playerObj *Player) {
    this.mutex.Lock()
    defer this.mutex.Unlock()
    this.playerMap[playerObj.Id] = playerObj
}

func NewMutexPlayer() *MutexPlayer {
    return &MutexPlayer{
        playerMap: make(map[int]*Player, 1024),
    }
}

// Use csp as the way to prevent concurrency
type CspPlayer struct {
    playerMap    map[int]*Player
    baseModelObj *BaseModel
    cancel       context.CancelFunc
}

func NewCspPlayer() *CspPlayer {
    ctx, cancel := context.WithCancel(context.Background())
    cspPlayerObj := &CspPlayer{
        playerMap:    make(map[int]*Player, 1024),
        baseModelObj: NewBaseModel(),
        cancel:       cancel,
    }
    cspPlayerObj.baseModelObj.Start(ctx)

    // Register callback
    cspPlayerObj.baseModelObj.Register("GetPlayerById", cspPlayerObj.getPlayerByIdCallback)
    cspPlayerObj.baseModelObj.Register("GetPlayerList", cspPlayerObj.getPlayerListCallback)
    cspPlayerObj.baseModelObj.Register("AddPlayer", cspPlayerObj.addPlayerCallback)

    return cspPlayerObj
}

func (this *CspPlayer) Stop() {
    this.cancel()
}

type GetPlayerByIdResponse struct {
    Value  *Player
    Exists bool
}

func (this *CspPlayer) GetPlayerById(id int) (*Player, bool) {
    retCh := make(chan *GetPlayerByIdResponse)
    this.baseModelObj.RequestChannel <- NewRequest("GetPlayerById", id, retCh)
    responseObj := <-retCh
    return responseObj.Value, responseObj.Exists
}

func (this *CspPlayer) getPlayerByIdCallback(requestObj *Request) {
    id, _ := requestObj.Parameter.(int)
    playerObj, exists := this.playerMap[id]
    getPlayerByIdResponseObj := &GetPlayerByIdResponse{
        Value:  playerObj,
        Exists: exists,
    }
    retCh, _ := requestObj.ReturnCh.(chan *GetPlayerByIdResponse)
    retCh <- getPlayerByIdResponseObj
}

type GetPlayerListResponse struct {
    Value []*Player
}

func (this *CspPlayer) GetPlayerList() []*Player {
    retCh := make(chan *GetPlayerListResponse)
    this.baseModelObj.RequestChannel <- NewRequest("GetPlayerList", nil, retCh)
    responseObj := <-retCh
    return responseObj.Value
}

func (this *CspPlayer) getPlayerListCallback(requestObj *Request) {
    playerList := make([]*Player, 0, len(this.playerMap))
    for _, value := range this.playerMap {
        playerList = append(playerList, value)
    }
    getPlayerListResponseObj := &GetPlayerListResponse{
        Value: playerList,
    }

    retCh, _ := requestObj.ReturnCh.(chan *GetPlayerListResponse)
    retCh <- getPlayerListResponseObj
}

func (this *CspPlayer) AddPlayer(playerObj *Player) {
    this.baseModelObj.RequestChannel <- NewRequest("AddPlayer", playerObj, nil)
}

func (this *CspPlayer) addPlayerCallback(requestObj *Request) {
    playerObj, _ := requestObj.Parameter.(*Player)
    this.playerMap[playerObj.Id] = playerObj
}

func TestMutexPlayer(t *testing.T) {
    mutexPlayerObj := NewMutexPlayer()
    playerObj := NewPlayer(1, "Jordan", 100)
    mutexPlayerObj.AddPlayer(playerObj)
    playerList := mutexPlayerObj.GetPlayerList()
    if len(playerList) != 1 {
        t.Fatalf("Expected %d items in the list, now got:%d", 1, len(playerList))
    }

    gotPlayerObj, exists := mutexPlayerObj.GetPlayerById(1)
    if !exists {
        t.Fatalf("Expected:%v, Got:%v", true, exists)
    }
    if gotPlayerObj.Equal(playerObj) == false {
        t.Fatalf("Expected:%s, Got:%s", playerObj, gotPlayerObj)
    }
}

func TestCspPlayer(t *testing.T) {
    cspPlayerObj := NewCspPlayer()
    playerObj := NewPlayer(1, "Jordan", 100)
    cspPlayerObj.AddPlayer(playerObj)
    playerList := cspPlayerObj.GetPlayerList()
    if len(playerList) != 1 {
        t.Fatalf("Expected %d items in the list, now got:%d", 1, len(playerList))
    }

    gotPlayerObj, exists := cspPlayerObj.GetPlayerById(1)
    if !exists {
        t.Fatalf("Expected:%v, Got:%v", true, exists)
    }
    if gotPlayerObj.Equal(playerObj) == false {
        t.Fatalf("Expected:%s, Got:%s", playerObj, gotPlayerObj)
    }
}

func BenchmarkMutexPlayer(b *testing.B) {
    mutexPlayerObj := NewMutexPlayer()
    for i := 0; i < b.N; i++ {
        mutexPlayerObj.AddPlayer(NewPlayer(i, fmt.Sprintf("Player%d", i), i))
        mutexPlayerObj.GetPlayerById(i)
        mutexPlayerObj.GetPlayerList()
    }
}

func BenchmarkCspPlayer(b *testing.B) {
    cspPlayerObj := NewCspPlayer()
    for i := 0; i < b.N; i++ {
        cspPlayerObj.AddPlayer(NewPlayer(i, fmt.Sprintf("Player%d", i), i))
        cspPlayerObj.GetPlayerById(i)
        cspPlayerObj.GetPlayerList()
    }
    cspPlayerObj.Stop()
}

对于外部的使用者,使用Mutex/RWMutex和使用channel,两者没有任何区别。所以可以很方便地进行实现,以及内部的改造。

详细代码,可以访问:https://github.com/Jordanzuo/goutil/tree/master/cspBaseModel

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,542评论 6 504
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,822评论 3 394
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,912评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,449评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,500评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,370评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,193评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,074评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,505评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,722评论 3 335
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,841评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,569评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,168评论 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,783评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,918评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,962评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,781评论 2 354

推荐阅读更多精彩内容