一、延迟消息适应场景
一般延迟队列用于特定事件发生后隔一段时间需要做特定处理的场景,下面举几个常见的栗子
1.电商系统中,若用户下单后30min不支付,自动取消订单
2.用户登录APP浏览特定商品20min后还没下单,自动推送商品评测信息的消息并发放商品相关优惠券
二、rabbitMQ的延迟消息
Rabbitmq本身是没有延迟队列的,要实现延迟消息,一般有两种方式:
1.通过Rabbitmq本身队列的特性来实现,需要使用Rabbitmq的死信交换机(Exchange)和消息的存活时间TTL(Time To Live)
2.在rabbitmq 3.5.7及以上的版本提供了一个插件(rabbitmq-delayed-message-exchange)来实现延迟队列功能。同时插件依赖Erlang/OPT 18.0及以上
一个个来看
三、rabbitMQ不使用插件实现延迟消息
3.1 原理图解
若想不借助插件实现rabbitMQ的延迟消息,实际就是利用一个没有消费者的Queue1,等待消息过期后,通过交换机转发到Queue2来进行消费,消息的延迟时间就是消息在Queue1中的存活时间
3.2 在一个传统的spring项目中配置rabbitMQ的延迟消息
3.2.1 创建一个自动过期的消息队列(Queue1)
此队列没有消费类,可以通过x-message-ttl设置消息过期的时间,默认单位是毫秒,通过x-dead-letter-exchange设置死信后转发的交换机,通过x-dead-letter-routing-key来设置死信交换机中,真正需要转发的绑定的key,例如一个延迟5分钟的延迟队列的配置可以为:
<rabbit:queue id="delayFiveMinuteQueue" durable="true" auto-delete="false" exclusive="false" name="delayFiveMinuteQueue">
<rabbit:queue-arguments>
<!-- 设置延迟队列的过期时间 -->
<entry key="x-message-ttl" value="300000" value-type="java.lang.Integer"/>
<entry key="x-dead-letter-exchange" value="mq-exchange" />
<entry key="x-dead-letter-routing-key" value="delayConsumerQueue" />
</rabbit:queue-arguments>
</rabbit:queue>
3.2.2 创建转发后的消息队列(Queue2)
此队列为转发后的队列,故需要有消费类,其配置和一般的队列保持一致,例如
<!-- 延迟消息的消费队列 -->
<rabbit:queue id="delayConsumerQueue" durable="true" auto-delete="false" exclusive="false" name="delayConsumerQueue"/>
3.2.3在交换机上进行绑定
交换机需要绑定队列和转发的key之间的关系,故需要注意的是Queue1的x-dead-letter-routing-key一定要和交换机中Queue2绑定的key保持一致,例如上述两个队列绑定后的配置示例为:
<rabbit:direct-exchange name="mq-exchange" durable="true" auto-delete="false" id="mq-exchange">
<rabbit:bindings>
<rabbit:binding queue="delayConsumerQueue" key="delayConsumerQueue" />
<rabbit:binding queue="delayFiveMinuteQueue" key="delayFiveMinuteQueue" />
</rabbit:bindings>
</rabbit:direct-exchange>
3.3不使用插件实现延迟消息的局限性
可以看到如果不使用插件,延迟消息的延迟时间是依赖于Queue1的x-message-ttl的,也就是说,需要支持多少种延迟的时间,就得提前设置好多少个无消费类的Queue,而且由于转发绑定的Queue2需要配到交换机中,比较死板,而真实的业务中消费类肯定是不一样的,故我真正在实现的时候在发到Queue1之前把消息体及需要消费的Queue(假设名称为Queue3)进行了落库,Queue2中进行消费时,实际是查库把消息读出来然后发送到了Queue3
而如果使用插件就没有以上的局限
四、rabbitMQ使用插件实现延迟消息
4.1 插件简介
在rabbitmq 3.5.7及以上的版本提供了一个插件(rabbitmq-delayed-message-exchange)来实现延迟队列功能。同时插件依赖Erlang/OPT 18.0及以上。
插件源码地址:
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
插件下载地址:
https://bintray.com/rabbitmq/community-plugins/rabbitmq_delayed_message_exchange
4.2 插件使用方式
4.2.1 安装
进入插件安装目录
{rabbitmq-server}/plugins/(可以查看一下当前已存在的插件)
下载插件
(如果下载的文件名称不规则就手动重命名一下如:rabbitmq_delayed_message_exchange-0.0.1.ez)
4.2.2 启用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
(关闭插件)
rabbitmq-plugins disable rabbitmq_delayed_message_exchange
4.2.3 插件使用
在需要发送延迟消息队列的项目中,声明一个x-delayed-message类型的交换机来使用delayed-messaging特性,注意这个交换机并不是rabbitmq本身的,而是插件提供的,一定要是x-delayed-message类型,绑定的queue就是正常的queue即可,不需要额外多余的queue(这是和不用插件方式的最大区别及好处),例如
<rabbit:direct-exchange name="mq-exchange-delayed" durable="true" auto-delete="false" id="mq-exchange-delayed" delayed="true">
<rabbit:bindings>
<rabbit:binding queue="demoQueue" key="demoQueue"/>
</rabbit:bindings>
</rabbit:direct-exchange>
消息发送时,在header添加"x-delay"参数来控制消息的延时时间,如果使用的是spring-rabbit中的RabbitTemplate,只需要通过messageProperties.setDelay(delay)方法set上延迟时间即可(单位为毫秒)