nsq-nsqd

一般nsqd是部署在靠近client的,甚至是本机,用于收集client的pub信息

nsqd会吧topic和channel信息持久化,并同步到nsqlookup


入口代码位于github.com/nsqio/nsq/apps/nsqd.go

func (p *program) Start() error {

                ...

                nsqd := nsqd.New(opts)

                加载旧的metadata(topic,channnel)

                err := nsqd.LoadMetadata()

                过滤ephemeral的topic和channel后进行持久化

                err = nsqd.PersistMetadata()

                启动nsqd

                nsqd.Main()

            ...

}

此处我们只看tcpserver(它用于client创建pub,sub等)

func (n *NSQD) Main() {

                ...

                tcpListener, err := net.Listen("tcp", n.getOpts().TCPAddress)

                tcpServer := &tcpServer{ctx: ctx}

                protocol.TCPServer(n.tcpListener, tcpServer, n.logf)

                ...        

                用于同步本nsqd的topic,channel到nsqlookupd

                n.waitGroup.Wrap(func() { n.lookupLoop() })

                ...

}



func (p *tcpServer) Handle(clientConn net.Conn) {

                ...

                 var prot protocol.Protocol

                ...

                 prot = &protocolV2{ctx: p.ctx}

                进入读取命令

                err = prot.IOLoop(clientConn)

                ...

}


func (p *protocolV2) IOLoop(conn net.Conn) error {

                ...

                line, err = client.Reader.ReadSlice('\n')

                params := bytes.Split(line, separatorBytes)

                进入命令解析 

               response, err = p.Exec(client, params)

                ...

}



func (p *protocolV2) Exec(client *clientV2, params [][]byte) ([]byte, error) {

                ...    

                case bytes.Equal(params[0], []byte("PUB")):

                                进入发布命令

                                 return p.PUB(client, params)

...

}


func (p *protocolV2) PUB(client *clientV2, params [][]byte) ([]byte, error) {

                ...

                获取topic

                topic := p.ctx.nsqd.GetTopic(topicName)

                创建消息

                msg := NewMessage(topic.GenerateID(), messageBody)

                持久化消息

                err = topic.PutMessage(msg)

                ...

}


func (n *NSQD) GetTopic(topicName string) *Topic {

                ...

                新建topic

                 t = NewTopic(topicName, &context{n}, deleteCallback)

                从nsqlookup查询所有此topic下的channel

                channelNames, err := n.ci.GetLookupdTopicChannels(t.name, lookupdHTTPAddrs)

                 for _, channelName := range channelNames {

                            if strings.HasSuffix(channelName, "#ephemeral") {    

                            // we don't want to pre-create ephemeral channels

                            // because there isn't a client connected

                           continue

                    }

                ...

}


func NewTopic(topicName string, ctx *context, deleteCallback func(*Topic)) *Topic {

                ...

                用于持久化    

                t.backend = diskqueue.New(

                ...)

                ...

                通知创建topic成功

                t.ctx.nsqd.Notify(t)

                ...

}

func (n *NSQD) Notify(v interface{}) {

                ...

                case n.notifyChan <- v:

                ...

                     if !persist {

                         return

                   }

                    n.Lock()

                    err := n.PersistMetadata()

                    持久化topic

                    ...        

}



func (t *Topic) PutMessage(m *Message) error {

                ...

                err := t.put(m)

                ...

}


func (t *Topic) put(m *Message) error {

                ...    

                    err := writeMessageToBackend(b, m, t.backend)

                    ...

}


 func writeMessageToBackend(buf *bytes.Buffer, msg *Message, bq BackendQueue) error {

                 buf.Reset()

                 _, err := msg.WriteTo(buf)

                    if err != nil {

                         return err

                     }

                return bq.Put(buf.Bytes())

 }


 func (d *diskQueue) Put(data []byte) error {

     d.RLock()

     defer d.RUnlock()


     if d.exitFlag == 1 {

         return errors.New("exiting")

     }


     d.writeChan <- data

     return <-d.writeResponseChan

 }


func (d *diskQueue) ioLoop() {

...

 case dataWrite := <-d.writeChan:

              真正的持久化入口

             d.writeResponseChan <- d.writeOne(dataWrite)

...

}


func (n *NSQD) lookupLoop() {

...

                通过此方法来更新nsqlookupd重连后nsqlookupd的topic,channel信息

                 lookupPeer := newLookupPeer(host, n.getOpts().MaxBodySize, n.logf,

                    connectCallback(n, hostname, syncTopicChan))

...

select {

        定期ping nsqlookupd

         case <-ticker:

            for _, lookupPeer := range lookupPeers {

                n.logf(LOG_DEBUG, "LOOKUPD(%s): sending heartbeat", lookupPeer)

                 cmd := nsq.Ping()

                 _, err := lookupPeer.Command(cmd)

                 if err != nil {

                     n.logf(LOG_ERROR, "LOOKUPD(%s): %s - %s", lookupPeer, cmd, err)

                 }


                ...

                同步topic到nsqlookup

                   case val := <-n.notifyChan:

                switch val.(type) {

                ...

                case *Topic:

                ...

                topic := val.(*Topic)

                 if topic.Exiting() == true {

                     cmd = nsq.UnRegister(topic.name, "")

                } else {

                     cmd = nsq.Register(topic.name, "")

                 }

             }


            for _, lookupPeer := range lookupPeers {


                _, err := lookupPeer.Command(cmd)


...

}

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,233评论 6 495
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,357评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,831评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,313评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,417评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,470评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,482评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,265评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,708评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,997评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,176评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,827评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,503评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,150评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,391评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,034评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,063评论 2 352

推荐阅读更多精彩内容