问题
业务反馈线上队列无消费者,但是management管理端发现有 Unacked 量
基本原理
- Unacked 消息指的是服务端已经投递给消费者,但还没有收到消费者 Ack 这么一个中间状态。
- 消费者如果在消费过程中(尚未给Broker响应Ack)退出,那么消费中的这部分Unacked 消息会被 requeue 到队列,进而投递到其他消费者(如果有 Consumers在线);如果没有 Consumers,那这部分 Unacked 消息会转换为 Ready 状态。
- Unacked 状态消息被 Broker 维护在内存中。
而上面截图的问题:队列有 Unacked 量,但是没有 Consumers 信息,似乎很矛盾。
定位
阅读服务端(v3.6.x
)消费相关模块代码,发现 rabbit_queue_consumers 模块:
计算Unacked消息量
unacknowledged_message_count() ->
lists:sum([queue:len(C#cr.acktags) || C <- all_ch_record()]).
读进程字典
all_ch_record() -> [C || {{ch, _}, C} <- get()].
写进程字典
store_ch_record(C = #cr{ch_pid = ChPid}) ->
put({ch, ChPid}, C),
ok.
rabbit_queue_consumers 模块实际是被 rabbit_amqqueue_process 调用。不难发现 队列主进程 的 进程字典 维护着消息消费过程中
的消费Channel进程等信息。
验证
- 启动Erlang shell
erl -name sj@test -setcookie $cookie
- 获取队列amqqueue记录信息,其中包含队列主进程pid等信息。
rpc:call(N,erlang,apply,[fun()->rabbit_amqqueue:lookup(rabbit_misc:r(<<"VHOST">>,queue,<<"QUEUE">>)) end,[]]).
- 获取队列主进程的进程字典信息。Pid为步骤
2
获取到的主进程Pid
rpc:call(node(Pid), erlang, process_info, [Pid,dictionary]).
最终发现了类似如下6条关键信息,与问题中的 Unacked 数量对应:
{{ch,<8232.16445.2870>},
{cr,<8232.16445.2870>,#Ref<8251.0.972816396.42542>,
{[{3,<<"amq.ctag-RkzjrIA60GwE9ED8opKK7w">>}],[]},
0,
{queue,[],[],0},
{qstate,<8232.16442.2870>,dormant,{0,nil}},
1}}
解决
已经能找到上述 Unacked 状态的6条消息对应的Channel进程,如何让这些消息重入队(成为Ready状态)?
发现 rabbit_amqqueue_process 模块 handle_ch_down/2 方法,该方法内部调用 rabbit_queue_consumers 模块 erase_ch/2 方法,
最终调用 erase/2 将 {ch, ChPid} 这个key从进程字典中移除。
因此,可以通过触发channel进程DOWN
exit(ChPid,normal).
最终队列进程感知 Channel 进程退出,进而将 Unacked 状态消息requeue,最终消息转变为 Ready 状态~
拓展
如何获取队列 Unacked 状态消息?
sys:get_state(Pid).
有兴趣可以调试下