NSQ源码分析(1)-nsqd消息的生产

NSQ通过topic区分不同的消息队列,每个topic具有不同的channel,同一个topic下的每一个消息会被广播到每个channel中。

消息从生产者到消费者之路

nsq同时支持HTTP协议和TCP协议,客户端可以通过tcp经过特定的协议发布一个消息到nsq的指定topic,或者通过http协议的指定接口。

我们先来看一条消息由客户端发布到NSQ的topic会发生什么。

从topic到channel

下面是简单的流程图:

Alt text

无论是http还是tcp调用,都会调用nsqd/topic.go/Topic.PutMessage方法。内部会把它放入memoryMsgChan这个Buffered Channel。buffer的大小由配置设定,超过了buffer大小的消息会写入backend,即diskq。
至此,put消息的同步操作完成,剩下的工作由这个topic的协程异步完成,这个协程执行nsqd/topic.go/Topic.messagePump方法。这个方法的源码如下:

// messagePump从memoryMsgChan或者diskq里拿出message,并转发到这个topic下的每个Channel之中。
func (t *Topic) messagePump() {    
    var msg *Message
    var buf []byte
    var err error
    var chans []*Channel
    var memoryMsgChan chan *Message
    var backendChan chan []byte

    t.RLock()
    for _, c := range t.channelMap {
        chans = append(chans, c)
    }
    t.RUnlock()

    if len(chans) > 0 {
        memoryMsgChan = t.memoryMsgChan
        backendChan = t.backend.ReadChan()
    }

    for {
        select {
        case msg = <-memoryMsgChan:
        case buf = <-backendChan:
            msg, err = decodeMessage(buf)
            if err != nil {
                t.ctx.nsqd.logf(LOG_ERROR, "failed to decode message - %s", err)
                continue
            }
        case <-t.channelUpdateChan: //topic channels update
            chans = chans[:0]
            t.RLock()
            for _, c := range t.channelMap {
                chans = append(chans, c)
            }
            t.RUnlock()
            if len(chans) == 0 || t.IsPaused() {
                memoryMsgChan = nil
                backendChan = nil
            } else {
                memoryMsgChan = t.memoryMsgChan
                backendChan = t.backend.ReadChan()
            }
            continue
        case pause := <-t.pauseChan:
            if pause || len(chans) == 0 {
                memoryMsgChan = nil
                backendChan = nil
            } else {
                memoryMsgChan = t.memoryMsgChan
                backendChan = t.backend.ReadChan()
            }
            continue
        case <-t.exitChan:
            goto exit
        }
        
        //遍历所有订阅topic的channel
        for i, channel := range chans {
            chanMsg := msg
            // 除了第一个channel,都需要复制message,每个channel需要unique的消息
            if i > 0 {
                chanMsg = NewMessage(msg.ID, msg.Body)
                chanMsg.Timestamp = msg.Timestamp
                chanMsg.deferred = msg.deferred
            }
            if chanMsg.deferred != 0 {
                channel.PutMessageDeferred(chanMsg, chanMsg.deferred)
                continue
            }
            err := channel.PutMessage(chanMsg)
            if err != nil {
                t.ctx.nsqd.logf(LOG_ERROR,
                    "TOPIC(%s) ERROR: failed to put msg(%s) to channel(%s) - %s",
                    t.name, msg.ID, channel.name, err)
            }
        }
    }

exit:
    t.ctx.nsqd.logf(LOG_INFO, "TOPIC(%s): closing ... messagePump", t.name)
}

这段代码非常简单,但是这部分异步的操作不同于许多传统语言的实现,比如放到线程池里去执行一段代码。

NSQ的这种方式在高并发的环境下并没有加很多的锁,而是通过channel和单协程操作关键数据结构的方式实现。channel实现协程间的通信,每一个数据结构对象(需要高并发操作的一组相关数据)都会在创建之初启动一个维护协程(messagePump),负责用select监听其它协程发给这组结构的消息(包含需要对数据进行的操作),并在无竞争的情况下操作这组数据。这样的操作串行了所有对共享数据的所有操作,避免大量使用锁。需要注意的是,在这里,这些对数据的串行操作都是读写数据结构,还有写到其它channel做通信之类的操作,应当要避免特别耗时的计算或者同步的IO,否则会造成channel的阻塞。

这也是golang下并发开发的一种比较常见的范式,golang推荐的同步方式是通信,而不是共享内存,这种范式也是这种思想的体现。详细可以看Effective Go - Concurrency这部分怎么说:

Share by communicating
Concurrent programming is a large topic and there is space only for some Go-specific highlights here.
Concurrent programming in many environments is made difficult by the subtleties required to implement correct access to shared variables. Go encourages a different approach in which shared values are passed around on channels and, in fact, never actively shared by separate threads of execution. Only one goroutine has access to the value at any given time. Data races cannot occur, by design. To encourage this way of thinking we have reduced it to a slogan:
Do not communicate by sharing memory; instead, share memory by communicating.
This approach can be taken too far. Reference counts may be best done by putting a mutex around an integer variable, for instance. But as a high-level approach, using channels to control access makes it easier to write clear, correct programs.
One way to think about this model is to consider a typical single-threaded program running on one CPU. It has no need for synchronization primitives. Now run another such instance; it too needs no synchronization. Now let those two communicate; if the communication is the synchronizer, there's still no need for other synchronization. Unix pipelines, for example, fit this model perfectly. Although Go's approach to concurrency originates in Hoare's Communicating Sequential Processes (CSP), it can also be seen as a type-safe generalization of Unix pipes.

我们也可以看到,NSQ代码也会用到锁,那么什么时候用锁,什么时候用channel呢?最简单的原则就是,哪种用起来自然就用哪一种,哪种简单用哪种,哪种效率高用哪种。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,921评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,635评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,393评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,836评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,833评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,685评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,043评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,694评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,671评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,670评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,779评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,424评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,027评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,984评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,214评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,108评论 2 351
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,517评论 2 343

推荐阅读更多精彩内容

  • 经验靠积累,年龄越大积累越多,但这仅仅是理论,经验也是一种技能,要经过刻意练习,比如同样是下棋,有人可能成为大师,...
    a242022b9660阅读 348评论 0 1
  • 微博改变一切 导读:微博微信的崛起代表着中国的Web3.0时代的到来,其中微博可算是较早的探路者。社交通讯+社会化...
    L_alan阅读 957评论 0 1
  • “我的铁皮石斛!~” 后院又传来大姐的哀嚎。花鹊儿就知道,这一定又是离梦的杰作。 果然,不一会哀嚎声再起……不过,...
    商茹冰阅读 172评论 0 0
  • 学习一个新的技能的时候都要忘记自己是一个大专生、本科生、有着十年工作经验的职业经理人等等,忘记这些身份,开始学习。...
    懒虫的忧虑生活阅读 200评论 0 0
  • 生活地阳光,且孤独 不是有人陪伴就可以躲避的孤“毒”。 一杯敬月光️️️ ️http://m.kugou.com/...
    一白兔阅读 272评论 0 0