转载:http://www.imooc.com/article/283645
1.什么是死信队列
想必有些小伙伴应该听说过‘死信队列’这个开发词汇,这个‘死信’到底是什么呢?怎么用呢?下面就给大家说下。
死信队列:没有被及时消费的消息存放的队列,消息没有被及时消费有以下几点原因:
a.消息被拒绝(basic.reject/ basic.nack)并且不再重新投递 requeue=false
b.TTL(time-to-live) 消息超时未消费
c.达到最大队列长度
消息变成死信后,会被重新投递(publish)到另一个交换机上(Exchange),这个交换机往往被称为DLX(dead-letter-exchange)“死信交换机”,然后交换机根据绑定规则转发到对应的队列上,监听该队列就可以被重新消费。
生产者-->发送消息-->交换机-->队列-->变成死信队列-->DLX交换机-->队列-->监听-->消费者
2.应用场景
在电商开发部分中,都会涉及到延时关闭订单,怎么实现延时关闭订单呢?
小明说:“使用定时任务去做不就行了麽”
小刚说:“使用delayQueue实现延时队列也可以嘛”
小红说:“这个我知道,使用rabbitmq利用延时策略去做”
综上所述:在高并发的情况下,不建议使用定时任务去做,因为太浪费服务器性能,不建议
delayQueue(无界阻塞队列)这个大家可以去百度下
下面注重介绍下RabbitMQ
RabbitMQ介绍
RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库
特性与作用
可伸缩性:支持集群 消息持久化:从内存持久化到硬盘,再从硬盘中读取到内容
作用:解耦,异步,削峰
为什么不用delayQueue,反而建议使用rabbitmq呢?
delayQueue是java并发多线程中的一种队列实现机制,不能做集群化处理,而且在团队开发中不利于维护,rabbitmq支持集群化处理,不需要我们把更多的时间放在代码的耦合度上面。
3.利用‘死信’来实现定时关闭订单
sender
config
/**
*死信交换机,死信交换机不是特定的交换机类型,而是普通的交互机
* 根据TTL(time-to-live)消息过期策略,来实现消息的回收,被称为Dead-Letter-exchange (死信交换机DLX)
* 一般消息变为死信,有以下三种情况:
* 1.消息被拒绝消费(requeue=false)
* 2.消息已经过期(TTL)
* 3.队列达到最大长度
*/
//创建死信交换机
@Bean
publicExchange deadLetterExcahnge(){
returnnewDirectExchange("DL_EXCHANGE");
}
//创建死信队列
@Bean
publicQueue deadLetterQueue(){
Map<String,Object> maps = newHashMap<>();
maps.put(DEAD_LETTER_QUEUE_KEY,"DL_EXCHANGE");
maps.put(DEAD_LETTER_ROUTING_KEY,"KEY_R");
returnnewQueue("DEAD_LETTER_QUEUE",true,false,false,maps);
}
//定义死信转发队列,其实也就是回收站
@Bean
publicQueue reDerictQueue(){
returnnewQueue("REQUEST_DERICT_QUEUE",true);
}
/**
* 死信路由通过 DL_KEY 绑定键绑定到死信队列上
* @return
*/
@Bean
publicBinding deadLetterBinding(){
returnnewBinding("DEAD_LETTER_QUEUE",Binding.DestinationType.QUEUE,"DL_EXCHANGE","DL_KEY",null);
}
/**
* 死信路由通过 KEY_R 绑定键绑定到死信队列上
* @return
*/
@Bean
publicBinding reDirectBinding(){
returnnewBinding("REQUEST_DERICT_QUEUE",Binding.DestinationType.QUEUE,"DL_EXCHANGE","KEY_R",null);
}
receiver
@Component
public class Receiver {
private static final Logger log = LoggerFactory.getLogger(Receiver.class);
@RabbitListener(queues = {"REQUEST_DERICT_QUEUE"})
public void receiver(String message){
log.debug("10秒后的消息为:{}",message);
}
上面的代码呢,只是参考部分,这边给大家推荐一篇文章:
https://www.cnblogs.com/lori/archive/2018/11/19/9984760.html