NSQD 源码分析

启动流程

  1. 利用 svc 创建一个常驻进程
  2. 读取配置,优先级:命令行、配置文件、默认配置
  3. 实例化 nsqd,初始化日志、http client,数据文件锁等
  4. 读取元数据,就是数组的 topics 并且包含里面的 channels,每个都有是否暂停的标志位
  5. 协程启动 tcp server, http[s] server , queueScanLoop, lookupLoop,statsdLoop
  6. 等待信号执行关闭 tcp server 和 http & https server,持久化元数据(topic & channel & version,存储格式 json)

TCP Server

  • 功能:
  1. 消费者:客户端初始化,开始 messagePump 到订阅的 channel
  2. 生产者:客户端初始化,推消息,创建 topic,topic 内部 messagePump 发往 channel
  • 生产流程:读取协议标识 V2 (4个字节),等待 identify、pub|mpub|dpub 写消息

  • 消费流程:读取协议标识 V2 (4个字节),等待 identify、sub、rdy 命令(标识、订阅,准备)。监听内存队列和磁盘队列把数据传输给客户端

  • 命令:

  1. IDENTIFY: 客户端连接上后,标识自己,同时得到服务端的信息
  2. FIN: 消息顺利处理完了,里面会 in-flight 的消息清理掉,rdy 加1, 还有其他统计数据
  3. RDY: 流控 客户端告知我有能力处理多少个消息,服务器端就浪多少呗
  4. REQ: 消息重新进队列
  5. PUB, MPUB, DPUB: 写消息, 优先写内存队列,写不进写 backend 队列,使用 select 模式,新的 topic 会 messagePump,很简单从内存和磁盘读,循环 channel 发送 msg,每个客户端的 messagePump 获取
  6. NOP:空指令什么也不干
  7. TOUCH: 重置消息超时时间, pop queue 放到最后去,受到 max msg timeout 限制
  8. SUB:订阅命令
  9. CLS: 关闭
# 消息格式,用于磁盘存储
[x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x]...
|       (int64)        ||    ||      (hex string encoded in ASCII)           || (binary)
|       8-byte         ||    ||                 16-byte                      || N-byte
------------------------------------------------------------------------------------------...
nanosecond timestamp      ^^                   message ID                       message body                        
                       (uint16)
                        2-byte
                       attempts

HTTP Server

  1. /ping 健康检查
  2. /info 打印一些配置信息,主机名,版本号,启动时间
  3. /pub & /mpub 生产消息,写内存,如果超配置数量写磁盘,没有消费者连接写磁盘
  4. /stats 统计信息,很详细
  5. /topic/create 创建 topic,内部会从 lookupd 同步 channel
  6. /topic/delete 删除 topic,未消费的消息存入磁盘,循环 channels,未消费的消息存入磁盘
  7. /topic/empty 清空 topic,内存和磁盘都清空
  8. /topic/pause & /topic/unpaus 中断或开启 topic,内部其实就是把两个管道架空,重新存储下元数据
  9. /channel/* channel 的创建,删除,清空,中断,开启。和 topic 类似就是换个结构体
  10. /config/:opt 取得配置信息,动态修改一些配置,nslookup、日志级别等
  11. /debug/pprof pprof debug 信息

queueScanLoop

  • 功能:将把在 InFlightQueue 和 DeferredQueue 过期的消息重新放入队列
  • 算法:用的是 redis 概率过期算法,维护一个队列扫描池,抽取部分 channel(默认25%) 进行扫描
  • 有超 25% 的 channel 是超时脏的,就没有休息的接着干呗
  • 注意:Message 对象的 pri 字段存的是过期时间
  • 注意:Pqueue ( priority 优先级队列)堆排序,分别用作延迟消息和消息的at least once机制。

lookupLoop

  • 功能:连接所有的 nsqlookupd 并同步 topic channel 信息的
  • 细节:创建连接,发送 V1,发送 Identify 指令,同步当前的 topic 和 channel,定期 ping 维护状态
  • 可动态配置 nsqlookupd,当 topic & channel 有变动时及时通知变更

statsdLoop

  • 功能:实时通过 udp 发送统计信息
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容