nsqlookupd与nsq交互
nsqd带参数启动
- 除了接收pub发布的topic,还可以通过硬盘备份的文件恢复创建topic
- 在nsqd启动时除了开启tcp,http与queueScanLoop超时检测线程,还有开启一个lookupLoop线程去注册nsqlookupd
func (p *program) Start() error {
opts := nsqd.NewOptions()
// 读取参数设置Options对象
flagSet := nsqdFlagSet(opts)
flagSet.Parse(os.Args[1:])
options.Resolve(opts, flagSet, cfg)
// 使用Options对象创建nsqd对象
nsqd := nsqd.New(opts)
nsqd.Main()
...
}
func (n *NSQD) Main() {
// 恢复创建备份文件中的topic
err := nsqd.LoadMetadata()
if err != nil {
log.Fatalf("ERROR: %s", err.Error())
}
// 超时消息检索和处理任务
n.waitGroup.Wrap(n.queueScanLoop)
// 根据参数选择注册中心nsqlookupd
n.waitGroup.Wrap(n.lookupLoop)
if n.getOpts().StatsdAddress != "" {
n.waitGroup.Wrap(n.statsdLoop)
}
}
NSQD.LoadMetadata() 备份文件创建topic
- 在创建topic时除了开启messagePump线程接收memoryMsgChan队列
- 还会nsqd.Notify(t)通过topic.notifyChan队列通知lookupLoop线程去注册该topic
func (n *NSQD) LoadMetadata() error {
// 读取nsqd.dat硬盘备份文件
fn := newMetadataFile(n.getOpts())
// 根据备份文件创建topic
for _, t := range m.Topics {
if !protocol.IsValidTopicName(t.Name) {
n.logf(LOG_WARN, "skipping creation of invalid topic %s", t.Name)
continue
}
topic := n.GetTopic(t.Name)
if t.Paused {
topic.Pause()
}
for _, c := range t.Channels {
if !protocol.IsValidChannelName(c.Name) {
n.logf(LOG_WARN, "skipping creation of invalid channel %s", c.Name)
continue
}
channel := topic.GetChannel(c.Name)
if c.Paused {
channel.Pause()
}
}
topic.Start()
}
return nil
}
func NewTopic(topicName string, ctx *context, deleteCallback func(*Topic)) *Topic {
t.waitGroup.Wrap(t.messagePump)
// 退出或者通知nsqdlookup进行注册操作
// 通过topic.notifyChan队列通知lookupLoop线程去注册该topic
t.ctx.nsqd.Notify(t)
return t
}
NSQD.lookupLoop 接收通知进行topic/channel注册
- 在nsqd启动时开启的lookupLoop线程会循环处理notifyChan队列发来的消息
// 为当前nsqd绑定nsqlookupd
func (n *NSQD) lookupLoop() {
for {
if connect {
for _, host := range n.getOpts().NSQLookupdTCPAddresses {
if in(host, lookupAddrs) {
continue
}
n.logf(LOG_INFO, "LOOKUP(%s): adding peer", host)
// 读取参数
// LOOKUP connecting to 127.0.0.1:4160
// LOOKUPD(127.0.0.1:4160): peer info {TCPPort:4160 HTTPPort:4161 Version:1.1.0 BroadcastAddress:sz-linrundong}
lookupPeer := newLookupPeer(host, n.getOpts().MaxBodySize, n.logf,
connectCallback(n, hostname))
// 进行连接nsqlooupd操作
lookupPeer.Command(nil) // start the connection
lookupPeers = append(lookupPeers, lookupPeer)
lookupAddrs = append(lookupAddrs, host)
}
n.lookupPeers.Store(lookupPeers)
connect = false
}
select {
// 通过NSQD.notifyChan队列获取nsqlookupd发来的消息
case val := <-n.notifyChan:
var cmd *nsq.Command
var branch string
// 断言解析队列消息
switch val.(type) {
case *Channel:
// notify all nsqlookupds that a new channel exists, or that it's removed
branch = "channel"
channel := val.(*Channel)
if channel.Exiting() == true {
cmd = nsq.UnRegister(channel.topicName, channel.name)
} else {
//拼装注册命令
cmd = nsq.Register(channel.topicName, channel.name)
}
case *Topic:
// notify all nsqlookupds that a new topic exists, or that it's removed
branch = "topic"
topic := val.(*Topic)
if topic.Exiting() == true {
cmd = nsq.UnRegister(topic.name, "")
} else {
cmd = nsq.Register(topic.name, "")
}
}
// 向每个lookupd发送请求命令cmd
for _, lookupPeer := range lookupPeers {
n.logf(LOG_INFO, "LOOKUPD(%s): %s %s", lookupPeer, branch, cmd)
_, err := lookupPeer.Command(cmd)
if err != nil {
n.logf(LOG_ERROR, "LOOKUPD(%s): %s - %s", lookupPeer, cmd, err)
}
}
case <-n.exitChan:
goto exit
}
}
exit:
n.logf(LOG_INFO, "LOOKUP: closing")
}
向nsqlookupd发送注册请求
func (lp *lookupPeer) Command(cmd *nsq.Command) ([]byte, error) {
initialState := lp.state
if lp.state != stateConnected {
err := lp.Connect()
if err != nil {
return nil, err
}
lp.state = stateConnected
_, err = lp.Write(nsq.MagicV1)
if err != nil {
lp.Close()
return nil, err
}
if initialState == stateDisconnected {
lp.connectCallback(lp)
}
if lp.state != stateConnected {
return nil, fmt.Errorf("lookupPeer connectCallback() failed")
}
}
if cmd == nil {
return nil, nil
}
_, err := cmd.WriteTo(lp)
if err != nil {
lp.Close()
return nil, err
}
resp, err := readResponseBounded(lp, lp.maxBodySize)
if err != nil {
lp.Close()
return nil, err
}
return resp, nil
}