golang-nsq系列(二)--nsqd源码解析

上一篇初识了 nsq 三个模块(nsqd, nsqlookupd, nsqadmin)的 demo演示,本篇则从源码开始,一步一步去解析 nsqd 的执行流程和逻辑处理,学习别人优秀的项目架构,以期学以致用。

1. nsqd 执行入口

nsq/apps/nsqd/main.go 可以找到执行入口文件,如下:

nsqd-path

2. nsqd 执行主逻辑源码

2.1 通过第三方 svc 包进行优雅的后台进程管理,svc.Run() -> svc.Init() -> svc.Start(),启动 nsqd 实例;

func main() {
  prg := &program{}
  if err := svc.Run(prg, syscall.SIGINT, syscall.SIGTERM); err != nil {
    logFatal("%s", err)
  }
}

func (p *program) Init(env svc.Environment) error {
  if env.IsWindowsService() {
    dir := filepath.Dir(os.Args[0])
    return os.Chdir(dir)
  }
  return nil
}

func (p *program) Start() error {
  opts := nsqd.NewOptions()

  flagSet := nsqdFlagSet(opts)
  flagSet.Parse(os.Args[1:])
  ...
}



2.2 初始化配置项(opts, cfg),加载历史数据(nsqd.LoadMetadata)、持久化最新数据(nsqd.PersistMetadata),然后开启协程,进入 nsqd.Main() 主函数;

options.Resolve(opts, flagSet, cfg)
  nsqd, err := nsqd.New(opts)
  if err != nil {
    logFatal("failed to instantiate nsqd - %s", err)
  }
  p.nsqd = nsqd

  err = p.nsqd.LoadMetadata()
  if err != nil {
    logFatal("failed to load metadata - %s", err)
  }
  err = p.nsqd.PersistMetadata()
  if err != nil {
    logFatal("failed to persist metadata - %s", err)
  }

  go func() {
    err := p.nsqd.Main()
    if err != nil {
      p.Stop()
      os.Exit(1)
    }
  }()



2.3 初始化 tcpServer, httpServer, httpsServer,然后循环监控队列信息(n.queueScanLoop)、节点信息管理(n.lookupLoop)、统计信息(n.statsdLoop)输出;

tcpServer := &tcpServer{ctx: ctx}
  n.waitGroup.Wrap(func() {
    exitFunc(protocol.TCPServer(n.tcpListener, tcpServer, n.logf))
  })
  httpServer := newHTTPServer(ctx, false, n.getOpts().TLSRequired == TLSRequired)
  n.waitGroup.Wrap(func() {
    exitFunc(http_api.Serve(n.httpListener, httpServer, "HTTP", n.logf))
  })
  if n.tlsConfig != nil && n.getOpts().HTTPSAddress != "" {
    httpsServer := newHTTPServer(ctx, true, true)
    n.waitGroup.Wrap(func() {
      exitFunc(http_api.Serve(n.httpsListener, httpsServer, "HTTPS", n.logf))
    })
  }

  n.waitGroup.Wrap(n.queueScanLoop)
  n.waitGroup.Wrap(n.lookupLoop)
  if n.getOpts().StatsdAddress != "" {
    n.waitGroup.Wrap(n.statsdLoop)
  }



2.4 分别处理 tcp/http 请求,开启 handler 协程进行并发处理,其中 newHTTPServer 注册路由采用了 Decorate 装饰器模式(后面会进一步解析);

http-Decorate路由分发:

router := httprouter.New()
  router.HandleMethodNotAllowed = true
  router.PanicHandler = http_api.LogPanicHandler(ctx.nsqd.logf)
  router.NotFound = http_api.LogNotFoundHandler(ctx.nsqd.logf)
  router.MethodNotAllowed = http_api.LogMethodNotAllowedHandler(ctx.nsqd.logf)
  s := &httpServer{
    ctx:         ctx,
    tlsEnabled:  tlsEnabled,
    tlsRequired: tlsRequired,
    router:      router,
  }

  router.Handle("GET", "/ping", http_api.Decorate(s.pingHandler, log, http_api.PlainText))
  router.Handle("GET", "/info", http_api.Decorate(s.doInfo, log, http_api.V1))

  // v1 negotiate
  router.Handle("POST", "/pub", http_api.Decorate(s.doPUB, http_api.V1))
  router.Handle("POST", "/mpub", http_api.Decorate(s.doMPUB, http_api.V1))
  router.Handle("GET", "/stats", http_api.Decorate(s.doStats, log, http_api.V1))

  // only v1
  router.Handle("POST", "/topic/create", http_api.Decorate(s.doCreateTopic, log, http_api.V1))
  router.Handle("POST", "/topic/delete", http_api.Decorate(s.doDeleteTopic, log, http_api.V1))

tcp-handler 处理:

for {
    clientConn, err := listener.Accept()
    if err != nil {
      if nerr, ok := err.(net.Error); ok && nerr.Temporary() {
        logf(lg.WARN, "temporary Accept() failure - %s", err)
        runtime.Gosched()
        continue
      }
      // theres no direct way to detect this error because it is not exposed
      if !strings.Contains(err.Error(), "use of closed network connection") {
        return fmt.Errorf("listener.Accept() error - %s", err)
      }
      break
    }
    go handler.Handle(clientConn)
  }



2.5 tcp 解析 V2 协议,走内部协议封装的 prot.IOLoop(conn) 进行处理;

var prot protocol.Protocol
  switch protocolMagic {
  case "  V2":
    prot = &protocolV2{ctx: p.ctx}
  default:
    protocol.SendFramedResponse(clientConn, frameTypeError, []byte("E_BAD_PROTOCOL"))
    clientConn.Close()
    p.ctx.nsqd.logf(LOG_ERROR, "client(%s) bad protocol magic '%s'",
      clientConn.RemoteAddr(), protocolMagic)
    return
  }

  err = prot.IOLoop(clientConn)
  if err != nil {
    p.ctx.nsqd.logf(LOG_ERROR, "client(%s) - %s", clientConn.RemoteAddr(), err)
    return
  }



2.6 通过内部协议进行 p.Exec(执行命令)、p.Send(发送结果),保证每个 nsqd 节点都能正确的进行消息生成与消费,一旦上述过程有 error 都会被捕获处理,确保分布式投递的可靠性。

params := bytes.Split(line, separatorBytes)

    p.ctx.nsqd.logf(LOG_DEBUG, "PROTOCOL(V2): [%s] %s", client, params)

    var response []byte
    response, err = p.Exec(client, params)
    if err != nil {
      ctx := ""
      if parentErr := err.(protocol.ChildErr).Parent(); parentErr != nil {
        ctx = " - " + parentErr.Error()
      }
      p.ctx.nsqd.logf(LOG_ERROR, "[%s] - %s%s", client, err, ctx)

      sendErr := p.Send(client, frameTypeError, []byte(err.Error()))
      if sendErr != nil {
        p.ctx.nsqd.logf(LOG_ERROR, "[%s] - %s%s", client, sendErr, ctx)
        break
      }

      // errors of type FatalClientErr should forceably close the connection
      if _, ok := err.(*protocol.FatalClientErr); ok {
        break
      }
      continue
    }

    if response != nil {
      err = p.Send(client, frameTypeResponse, response)
      if err != nil {
        err = fmt.Errorf("failed to send response - %s", err)
        break
      }
    }

3. nsqd 流程图小结

上述流程小结示意图如下:

nsqd.png

【小结】从源码可以看到,代码逻辑清晰明了,利用 Go 协程高效并发处理分布式多节点 nsqd 的消息生产与消费,里面有很多细节有待下一步仔细剖析,学以致用。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,843评论 6 502
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,538评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,187评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,264评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,289评论 6 390
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,231评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,116评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,945评论 0 275
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,367评论 1 313
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,581评论 2 333
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,754评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,458评论 5 344
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,068评论 3 327
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,692评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,842评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,797评论 2 369
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,654评论 2 354

推荐阅读更多精彩内容

  • 罗道文的私房菜这篇文章开始分析下NSQ源码;NSQ主要由三个部分nsqd、nsqlookupd、nsqadmin以...
    faunjoe阅读 917评论 0 1
  • 1. 介绍 最近在研究一些消息中间件,常用的MQ如RabbitMQ,ActiveMQ,Kafka等。NSQ是一个基...
    aoho阅读 8,932评论 1 16
  • 罗道文的私房菜nsqd是NSQ中最重要的组件,接收生产者的消息以及给消费者发送消息都由nsqd完成。因此,这篇文章...
    faunjoe阅读 1,156评论 0 0
  • 1. 开始 开篇罗嗦了一大堆,终于开始进入正题了。golang的优秀源码很多,比如杀手级应用docker,goog...
    kekemuyu阅读 2,817评论 1 4
  • 一、NSQ部署 我个人电脑是win10 64位,因此在这儿我就给出一个网友写好的Windows下NSQ部署文章NS...
    faunjoe阅读 820评论 0 0