背景
生产中有这么一些需求场景:
- 业务方会问某一队列当前有哪些应用在发送或消费
- 集群拆分时,队列上下游梳理
- 监控告警时,不产生误告
方式1:人工通过RMQ插件Management管理端,只能看Channels、Consumers信息,很难从中大量的信息中提取某一队列对应的生产者和消费者相关信息
方式2:通过监控比如Cat看当前该队列MQ相关打点,可行但很容易漏掉。许多Task类型的应用不是时刻都有MQ打点
以上两种方式均很难满足要求,运维成本较高而且容易漏掉。线上一个实际存在的例子是:有一个队列的上游(发送方)有多达18个应用!
如何高效、准确地获取队列生产/消费应用?
思路
- Producer,Consumer 通过 Channel 来和 Broker 通信,而 Channel 则是复用底层 Socket 连接。
- 在Broker实现中,Channel 进程(发送或者消费)都会监视
monitor
队列主进程。 -
通过rabbitmq_top插件相关模块获取 Channel 进程信息类似如下:
因此,如果我们能够获取该队列对应的发送和消费Channels进程信息,就可以提取出发送/消费应用的IP。
步骤
1、通过获取队列进程 Pid 的 monitored_by
信息,并利用 rabbitmq_top 插件相关模块从中提取出类型为 rabbit_channel 的进程 Chs:
{monitored_by,Res} = rpc:call(node(Pid), erlang, process_info, [Pid, monitored_by]),
Chs = lists:foldl(fun(E,Acc) -> case lists:keyfind(type,1,rpc:call(node(E), rabbit_top_util, obtain_name, [E])) of
{type,rabbit_channel} -> [E|Acc];
_ -> Acc end end, [], Res).
2、通过rabbitmq_management插件相关模块方法获取队列进程对应的消费者Channel进程 ConsumerChs:
Consumers4VHost = rpc:call(Node, erlang, apply, [fun() -> rabbit_mgmt_db:get_all_consumers(list_to_binary(Vhost)) end,[]]),
QName = Q#amqqueue.name#resource.name,
Consumers4Q = lists:filter(fun(T) -> case lists:keyfind(queue, 1, T) of
{queue,QRes} -> case lists:keyfind(name, 1, QRes) of
{name,QName} -> true;
_ -> false end;
_ -> false end end, Consumers4VHost),
ConsumerChs = lists:foldl(fun(E,Acc) -> case lists:keyfind(channel_pid, 1, E) of
{channel_pid,Ch} -> [Ch|Acc];
_ -> Acc end end, [], Consumers4Q).
步骤2改进
:
可以直接从队列主进程
的 进程字典
中过滤出消费 Channel 进程信息:
QPid = Q#amqqueue.pid,
{dictionary, Res} = rpc:call(node(QPid), erlang, process_info, [QPid, dictionary]),
ConsumerChs = lists:foldl(fun(E,Acc) -> case E of
{{ch,ChPid},_} -> [ChPid|Acc];
_ -> Acc end
end, [], Res).
3、生产者Channel进程 ProducerChs:
ProducerChs = Chs -- ConsumerChs.
4、这样我们获取了队列进程对应的生产者和消费者Channel进程列表,接下来从Channel进程信息中提取客户端IP就可以了:
lists:foldl(fun(E,Acc) -> case lists:keyfind(connection_name, 1, rpc:call(node(E), rabbit_top_util, obtain_name, [E])) of
{connection_name,Conn} ->
ConnStr = binary_to_list(Conn),
Ip = string:substr(ConnStr, 1, string:chr(ConnStr, $:) - 1),
[Ip|Acc];
_ -> Acc end end, [], ChsList).
最后通过去重上面的IP列表集合,得到该队列对应的生产者和消费者IP信息,通过调用相关接口反查IP对应的应用名即可。
相关拓展:
1、如何获取队列进程Pid?通过获取队列amqquque
记录:
QRes = rabbit_misc:r(list_to_binary(VHost), queue, list_to_binary(Q)),
[Queue] = rpc:call(Node, rabbit_amqqueue, lookup, [[QRes]]).
2、如何调试上述功能?
利用Erlang shell,确保集群节点已启用management、top插件
3、生产环境实践:
Escript方式运行;比较好的方法是以Erlang Web Server的方式运行,并提供HTTP接口给集群管控后台
4、其他场景
僵死队列问题,队列进程迁移、修复,获取队列进程、delegate进程关键指标(如邮箱大小、占用内存)等,都可以利用Broker/Plugin既有模块代码进行解决,前提需要熟悉服务端相关模块代码,多进行调试。