RabbitMQ实时获取队列级别生产和消费应用

背景

生产中有这么一些需求场景:

  • 业务方会问某一队列当前有哪些应用在发送或消费
  • 集群拆分时,队列上下游梳理
  • 监控告警时,不产生误告

方式1:人工通过RMQ插件Management管理端,只能看Channels、Consumers信息,很难从中大量的信息中提取某一队列对应的生产者和消费者相关信息

方式2:通过监控比如Cat看当前该队列MQ相关打点,可行但很容易漏掉。许多Task类型的应用不是时刻都有MQ打点

以上两种方式均很难满足要求,运维成本较高而且容易漏掉。线上一个实际存在的例子是:有一个队列的上游(发送方)有多达18个应用!

如何高效、准确地获取队列生产/消费应用?

思路

  • Producer,Consumer 通过 Channel 来和 Broker 通信,而 Channel 则是复用底层 Socket 连接。
  • 在Broker实现中,Channel 进程(发送或者消费)都会监视 monitor 队列主进程。
  • 通过rabbitmq_top插件相关模块获取 Channel 进程信息类似如下:


因此,如果我们能够获取该队列对应的发送和消费Channels进程信息,就可以提取出发送/消费应用的IP。

步骤

1、通过获取队列进程 Pid 的 monitored_by 信息,并利用 rabbitmq_top 插件相关模块从中提取出类型为 rabbit_channel 的进程 Chs:

{monitored_by,Res} = rpc:call(node(Pid), erlang, process_info, [Pid, monitored_by]),
Chs = lists:foldl(fun(E,Acc) -> case lists:keyfind(type,1,rpc:call(node(E), rabbit_top_util, obtain_name, [E])) of 
                                        {type,rabbit_channel} -> [E|Acc];
                                        _ -> Acc end end, [], Res).

2、通过rabbitmq_management插件相关模块方法获取队列进程对应的消费者Channel进程 ConsumerChs:

Consumers4VHost = rpc:call(Node, erlang, apply, [fun() -> rabbit_mgmt_db:get_all_consumers(list_to_binary(Vhost)) end,[]]),
QName = Q#amqqueue.name#resource.name,
Consumers4Q = lists:filter(fun(T) -> case lists:keyfind(queue, 1, T) of 
                                                {queue,QRes} -> case lists:keyfind(name, 1, QRes)  of 
                                                                     {name,QName} -> true; 
                                                                     _ -> false  end; 
                                                 _ -> false  end end, Consumers4VHost),
ConsumerChs = lists:foldl(fun(E,Acc) -> case lists:keyfind(channel_pid, 1, E) of
                                                {channel_pid,Ch} -> [Ch|Acc];
                                                _ -> Acc end end, [], Consumers4Q).

步骤2改进
可以直接从队列主进程进程字典 中过滤出消费 Channel 进程信息:

QPid = Q#amqqueue.pid,
{dictionary, Res} = rpc:call(node(QPid), erlang, process_info, [QPid, dictionary]),
ConsumerChs = lists:foldl(fun(E,Acc) -> case E of 
                                  {{ch,ChPid},_} -> [ChPid|Acc];
                                  _ -> Acc end
                        end, [], Res).

3、生产者Channel进程 ProducerChs:

ProducerChs = Chs -- ConsumerChs.

4、这样我们获取了队列进程对应的生产者和消费者Channel进程列表,接下来从Channel进程信息中提取客户端IP就可以了:

lists:foldl(fun(E,Acc) -> case lists:keyfind(connection_name, 1, rpc:call(node(E), rabbit_top_util, obtain_name, [E])) of 
                                  {connection_name,Conn} -> 
                                      ConnStr = binary_to_list(Conn),
                                      Ip = string:substr(ConnStr, 1, string:chr(ConnStr, $:) - 1),
                                      [Ip|Acc];
                                  _ -> Acc  end  end, [], ChsList).

最后通过去重上面的IP列表集合,得到该队列对应的生产者和消费者IP信息,通过调用相关接口反查IP对应的应用名即可。

相关拓展:

1、如何获取队列进程Pid?通过获取队列amqquque记录:

QRes = rabbit_misc:r(list_to_binary(VHost), queue, list_to_binary(Q)),
[Queue] = rpc:call(Node, rabbit_amqqueue, lookup, [[QRes]]).

2、如何调试上述功能?
利用Erlang shell,确保集群节点已启用management、top插件

3、生产环境实践:
Escript方式运行;比较好的方法是以Erlang Web Server的方式运行,并提供HTTP接口给集群管控后台

4、其他场景
僵死队列问题,队列进程迁移、修复,获取队列进程、delegate进程关键指标(如邮箱大小、占用内存)等,都可以利用Broker/Plugin既有模块代码进行解决,前提需要熟悉服务端相关模块代码,多进行调试。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,463评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,868评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,213评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,666评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,759评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,725评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,716评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,484评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,928评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,233评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,393评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,073评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,718评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,308评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,538评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,338评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,260评论 2 352

推荐阅读更多精彩内容