NewConsumer
根据 topic、channel、config 实例化 Consumer,内部会验证配置。最后启动rdyLoop
,计时器(默认 5s)执行redistributeRDY
用来触发maybeUpdateRDY
重新均衡分配,最理想的情况是MaxInFlight / len(conns)
。AddConcurrentHandlers
创建多个协程处理消息,循环从 Consumer 的incomingMessages
通道获取消息,判断超过MaxAttempts
直接发送FIN
指令。 调用自定义的处理函数,如果返回错误发送REQ
指令,否则发送FIN
指令。前提都是打开 AutoResponse。默认的REQ
都是带backoff
的( 当自定义的处理器返回 err 时,消息重新入队列。 backoff 避让机制会触发,让本连接在一段时间内不再接受消息 )。根据 nsqlookupd 的配置,循环连接
ConnectToNSQLookupd
,内部会验证地址,去重,调用 lookup 接口获取所有的 nsqd 地址,获取到所有 nsqd 后循环连接 nsq。并启动一个 lookupdLoop 协程,间隔一段时就重复一次。ConnectToNSQD
初始化连接后,内部会启动 WriteLoop 和 ReadLoop 2 个协程。readLoop 循环读取并 unpack 数据,接受心跳请求,接受关闭请求,解码消息,通过 incomingMessages 管道传递消息,并尝试更新 maybeUpdateRDY,接受 err 返回 。
writeLoop 负责通过 msgResponseChan 返回消息的处理结果,里面会处理 backoff 计数、时长、恢复,还负责发送指令。执行 sub 订阅命令,循环所有 conn 执行 maybeUpdateRDY。
- 发送
MagicV2
协议头 - 发送
IDENTIFY
指令 - 发送
SUB
指令,订阅 topic - NSQD 推 messages 给到 consumer
- 发送 message 的返回指令,可以是
REQ
,TOUCH
,FIN
maybeUpdateRDY
RDY
是 nsqd 推送消息流量控制的核心。会在创建连接时、接受到消息时触发,里面有 3 个情况会发送 RDY
指令。
remain := conn.RDY()
lastRdyCount := conn.LastRDY()
count := r.perConnMaxInFlight()
// refill when at 1, or at 25%, or if connections have changed and we're imbalanced
if remain <= 1 || remain < (lastRdyCount/4) || (count > 0 && count < remain) {
r.log(LogLevelDebug, "(%s) sending RDY %d (%d remain from last RDY %d)",
conn, count, remain, lastRdyCount)
r.updateRDY(conn, count)
}
- 剩余量太少
- 消费得很快
- 超量了