nsqlookup 用于收集nsqd上报的topic和channel
基于此对client的查询返回对应的nsqd node
然后client对响应的nsqd进行sub
本篇主要选取查询topic的片段
入口代码位于github.com/nsqio/nsq/apps/nsqlookupd
func (p *program) Start() error {
...
daemon := nsqlookupd.New(opts)
daemon.Main()
...
}
func (l *NSQLookupd) Main() {
....
nsqd通过此server交互
tcpListener, err := net.Listen("tcp", l.opts.TCPAddress)
tcpServer := &tcpServer{ctx: ctx}
protocol.TCPServer(tcpListener, tcpServer, l.logf)
client通过此server交互
httpListener, err := net.Listen("tcp", l.opts.HTTPAddress)
httpServer := newHTTPServer(ctx)
http_api.Serve(httpListener, httpServer, "HTTP", l.logf)
....
}
func (p *tcpServer) Handle(clientConn net.Conn) {
....
prot = &LookupProtocolV1{ctx: p.ctx}
....
err = prot.IOLoop(clientConn)
....
}
func (p *LookupProtocolV1) IOLoop(conn net.Conn) error {
...
response, err = p.Exec(client, reader, params)
...
}
func (p *LookupProtocolV1) Exec(client *ClientV1, reader *bufio.Reader, params []string) ([]byte, error) {
...
这个是nsqd同步来的topic
case "REGISTER":
return p.REGISTER(client, reader, params[1:])
...
}
func newHTTPServer(ctx *Context) *httpServer {
....
router.Handle("GET", "/lookup", http_api.Decorate(s.doLookup, log, http_api.V1))
}
func (s *httpServer) doLookup(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
用于查询topic
.,..
}