突然被拉进一个群组,消息咚咚咚的来了,打开一看是个生产问题:MQ数据猛增,一天发了3T+消息,瞬间惊呆。。。默默在想,可不是我搞出来的吧?震惊的同时抓紧查缘由。再此之前,除了大神的分享,自己平时的代码查看,可没接触过rabbitmq啊 无从下手中。。。
天赋很重要,然而我有啊!
开始分析原因,在rabbitmq后台管理界面查询为啥消息量那么大呢,在众多队列中突然发现一个有意思的:
内存就占用了2个G,罪魁祸首就是你了。
找出队列pporder_tradeRetry,继续往下找,啥也不知道就全局搜索咯。找出一个王二小:
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" concurrency="3">
<rabbit:listener queues="pporder_tradeRetry" ref="tradeRetryListener"/>
</rabbit:listener-container>
<!-- 队列消费者 -->
<bean id="tradeRetryListener" class="com.yeepay.g3.core.airpay.order.service.mq.listener.TradeRetryListener"/>
扭头发现方向不对,这是个消费者啊,得找出真凶生产者才行emmmmmm。。。
继续老办法,全局搜索,哈哈,这回来了一个王小二:
<rabbit:topic-exchange name="payplus.order.business.status.exchange" durable="true" auto-delete="false" id="businessChangeExchange">
<rabbit:bindings>
<!-- 交易重试 -->
<rabbit:binding pattern="*.RETRY" queue="pporder_tradeRetry"/>
</rabbit:bindings>
</rabbit:topic-exchange>
找出来了,但是
- topic-exchange是什么鬼。。。
- rabbit:binding又是啥
不过这个pattern="*.RETRY"能看懂点,正则表达式匹配嘛
傻瓜程序员解决方法:全局搜索。。。哇咔咔,开始搜“.RETRY”
功夫不负有心人,找到了。开始分析。。。
分析啥啊,啥也不懂,先学习学习rabbitMQ再来吧
开始正文解释rabbitMQ
背景
RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写巴拉巴拉的不说了,自己看,自己查,自己百度,额好像有人跟我说合格的程序员都应该COOGLE的
应用场景
应用场景有必要说一下
异步处理
应用解耦
流量削峰
名词解释
最关键的东西来了
ConnectionFactory、Connection、Channel都是RabbitMQ对外提供的API中最基本的对象。
ConnectionFactory:Connection的制造工厂。
Connection:就是一个TCP的连接。Producer和Consumer都是通过TCP连接到RabbitMQ Server的。
Channels:虚拟连接。它建立在上述的TCP连接中,数据流动都是在Channel中进行的。我们大部分的业务操作是在Channel这个接口中完成的,包括定义Queue、定义Exchange、绑定Queue与Exchange、发布消息等。
Virtual Host:一个虚拟概念,类似于权限控制组,一个Virtual Host里面可以有若干个Exchange和Queue,是权限控制的最小粒度,类似namespace。
Producer:生产者,数据的发送方。
Consumer:消费者,数据的接收方。
Queue:Queue(队列)是RabbitMQ的内部对象,用于存储消息。RabbitMQ中的消息都只能存储在Queue中,生产者生产消息并最终投递到Queue中,消费者从Queue中获取消息并消费。
Exchange:生产者将消息发送到Exchange(交换器),由Exchange将消息路由到一个或多个Queue中(或者丢弃)。
Routing key:生产者在将消息发送给Exchange的时候,一般会指定一个routing key,来指定这个消息的路由规则,而这个routing key需要与Exchange Type及binding key联合使用才能最终生效。
Binding key: RabbitMQ中通过Binding key将Exchange与Queue关联起来
交换机的四种模式:
- direct:转发消息到 routigKey 指定的队列。
- topic:对 key 进行模式匹配,比如ab可以传到到所有 ab 的 queue。
- headers:(这个还没有接触到)
- fanout:转发消息到所有绑定队列,忽略 routigKey
了解了这些开始分析一下那个让人胆战心惊的生产问题的缘由,123走:
//放入MQ队列 其中routerKey=WITHDRAW.RETRY
rabbitTemplate.convertAndSend(routerKey, tradeMqEntity);
topic-exchange根据routerKey进行模式匹配到队列pporder_tradeRetry
注意这个队列的定义
<rabbit:queue name="pporder_tradeRetry" durable="true" auto-delete="false" exclusive="false">
<rabbit:queue-arguments>
<entry key="x-dead-letter-exchange" value="payplus.order.dead.letter.exchange"/>
</rabbit:queue-arguments>
</rabbit:queue>
传说中的死信队列,解释一下什么叫做死信队列
DLX, Dead-Letter-Exchange。利用DLX, 当消息在一个队列中变成死信(dead message)之后,它能被重新publish到另一个Exchange
进入死信队列(进入死信的三种方式):
1.消息被拒绝(basic.reject or basic.nack)并且requeue=false
2.消息TTL过期
3.队列达到最大长度
查询日志发现由于某些原因队列pporder_tradeRetry的消费者果然选择了拒绝操作
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
由于消息被拒绝,满足死信队列的进入姿势1,于是消息被转发到交换机payplus.order.dead.letter.exchange 查看这个交换机绑定到队列pporder_retryDeadLetterQueue
<rabbit:queue name="pporder_retryDeadLetterQueue" durable="true" auto-delete="false" exclusive="false" >
<rabbit:queue-arguments>
<!-- 计时单位为毫秒 -->
<entry key="x-message-ttl" value="10000" value-type="java.lang.Long"/>
<entry key="x-dead-letter-exchange" value="payplus.order.business.status.exchange"/>
</rabbit:queue-arguments>
</rabbit:queue>
哇,怎么查都没查到这个队列的消费者,再仔细看看。x-message-ttl这个是啥,好像是个时间。查查发现,是消息TTL过期时间,满足死信队列进入姿势2,又回去一开始的payplus.order.business.status.exchange回去了,于是一直循环一直循环。。。循环到了宇宙毁灭。。。