在上一篇博文中提到,在消息确认消费的过程中,即消息处理过程中出现了异常,为避免消息重新归入队列又继续异常,也为了避免消息不归入队列而把消息丢弃掉,那么可以采用死信队列来处理该情况,当然这个也要结合实际场景,也不一定非要用死信队列,之前遇到过的场景就没采用死信队列,是这样的场景:同步订单后需要发送订单消息去处理,也没用死信队列,异常可以触发邮件告警,之后丢弃消息,而后处理完异常被丢弃的消息可以调用api触发再一次同步订单,故也没采用死信队列。
死信队列其实也是类似于普通的队列,有交换机、队列、路由等信息,只不过是叫做死信交换机和死信路由以及死信队列,相对特殊了一点,是在正常的队列中绑定了这个特殊的队列的交换机以及路由信息,这样一来正常的队列消息出现特殊的情况下(称为死信消息)可以把这个消息转向这个特殊的队列,即死信队列。
那么什么样的消息才会变成死信消息呢?
- 消息被拒绝(Basic.Reject/Basic.Nack),并且设置requeue参数为false;
- 消息过期;
- 队列达到最大长度。
对于第一点,可以针对消息消费过程中出现异常情况,把消息拒绝转向死信队列,对于第二点,可以利用起来变成延迟队列,其实延迟队列也是死信队列的另一种体现,场景是给消息设置时间,消息时间一到即变成死信消息,转向死信队列,故也称为延迟队列,延迟队列在实际生产环境中有更加广泛的应用。
一、死信队列
-
死信队列流程图
-
死信队列代码:
(1)首先是创建正常的队列以及在这个队列中绑定特殊的队列的交换机以及路由信息,在rabbitmq中是通过在创建队列的过程中增加附加参数x-dead-letter-exchange、x-dead-letter-routing-key,这两个参数是死信交换机和死信路由,我们知道在directExchange交换机类型中,交换机和路由可以为消息指引到队列,那么有了这两个参数,就可以绑定死信队列了。
- RabbitmqConfig配置类,创建两个队列,一个正常的普通队列directDeadPreQueue,以及死信队列deadQueue,普通队列创建的时候增加额外信息死信交换机x-dead-letter-exchange和死信路由x-dead-letter-routing-key。
@Slf4j
@Configuration
public class RabbitmqConfig {
//演示死信队列,为directExchange消息模型队列绑定死信队列
@Bean
public Queue directDeadPreQueue() {
//创建死信队列的组成成分map,用于存放组成成分的相关成员
Map<String, Object> args = new <String, Object>HashMap(2);
//设死信交换机
args.put("x-dead-letter-exchange", RabbitMqConstants.DEAD_EXCHANGE);
//死信队列的路由
args.put("x-dead-letter-routing-key", RabbitMqConstants.DEAD_ROUTING_KEY);
return new Queue(RabbitMqConstants.DIRECT_QUEUE_DEAD_PRE, true, false, false, args);
}
//交换机
@Bean
public DirectExchange directDeadPreExchange() {
return new DirectExchange(RabbitMqConstants.DIRECT_EXCHANGE_DEAD_PRE, true, false);
}
//交换机路由绑定队列
@Bean
public Binding directDeadPreBinding() {
return BindingBuilder.bind(directDeadPreQueue()).to(directDeadPreExchange()).with(RabbitMqConstants.DIRECT_ROUTING_KEY_DEAD_PRE);
}
//死信队列
@Bean
public Queue deadQueue() {
return new Queue(RabbitMqConstants.DEAD_QUEUE, true);
}
//死信交换机
@Bean
public DirectExchange deadExchange() {
return new DirectExchange(RabbitMqConstants.DEAD_EXCHANGE, true, false);
}
//路由交换机绑定死信队列
@Bean
public Binding deadBinding() {
return BindingBuilder.bind(deadQueue()).to(deadExchange()).with(RabbitMqConstants.DEAD_ROUTING_KEY);
}
}
- RabbitMqConstants常量值类
@Data
public class RabbitMqConstants {
//演示死信队列,为directExchange消息模型队列绑定死信队列
public static final String DIRECT_QUEUE_DEAD_PRE = "mq.direct.queue.dead.pre";
public static final String DIRECT_EXCHANGE_DEAD_PRE = "mq.direct.exchange.dead.pre";
public static final String DIRECT_ROUTING_KEY_DEAD_PRE = "mq.direct.routing.key.dead.pre";
//死信队列
public static final String DEAD_QUEUE = "mq.dead.queue";
public static final String DEAD_EXCHANGE = "mq.dead.exchange";
public static final String DEAD_ROUTING_KEY = "mq.dead.routing.key";
}
(2)创建完队列之后,启动项目,访问http://127.0.0.1:15672/,查看rabbitmq管理后台,可以看到普通队列新增了DLX、DLK的特性,即设置了死信交换机和死信路由。
(3)普通队列-生产者-OrdinaryPublisher
public class OrdinaryPublisher {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMsg(Order order) {
try {
//设置交换机、路由键,发送消息
rabbitTemplate.convertAndSend(RabbitMqConstants.DIRECT_EXCHANGE_DEAD_PRE, RabbitMqConstants.DIRECT_ROUTING_KEY_DEAD_PRE, order);
log.info("普通队列-生产者,发送消息:{}", order);
} catch (Exception e) {
log.error("普通队列-生产者,发送消息异常,消息:{},异常:", order, e);
}
}
}
(4)普通队列-消费者-OrdinaryConsumer,在监听到消息并且处理消息过程中故意抛除以0的异常,这样一来消息就被拒绝了。
public class OrdinaryConsumer {
@RabbitListener(queues = RabbitMqConstants.DIRECT_QUEUE_DEAD_PRE, containerFactory = "singleListenerContainerManual")
public void consumeMsg(Order order, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) Long tag) throws IOException {
try {
log.info("普通队列-消费者,监听到消息:{},准备处理业务逻辑。", order);
int i = 1 / 0;
channel.basicAck(tag, false);
} catch (Exception e) {
log.error("普通队列-消费者,监听到消息:{},发生异常,消息不再归入队列中,转向死信队列,异常e:", order, e);
//channel.basicNack(tag, false, false);
channel.basicReject(tag, false);
}
}
}
(5)执行test方法
@Test
public void testDeadPrePublish() {
Order order = new Order();
order.setOrdernum("1234567");
ordinaryPublisher.sendMsg(order);
}
这个时候由于我们的代码注释掉了监听死信队列,故打开rabbitmq管理后台可以看到死信队列中存着死信消息:
可以有两种种方式来解决这个问题,当然问题的根本是把异常处理完,然后消息重新消费。我们假设现在已经把1/0这个异常代码修复了即注释掉了。
-
方式一,在rabbitmq管理后台取出该消息并且在rabbitmq管理后台发送该消息到普通队列中
手工取出异常消息:{"ordernum":"1234567"}
发送异常消息到原先的普通队列mq.direct.queue.dead.pre进行再次消费
- 方式二,在代码中监听死信队列
在代码中监听该死信队列,动态配置一个配置值,标识是否修复了异常信息,如果修复了,那么发送消息到原先的普通队列进行消费,如果未修复,那么消息重新归入死信队列,直到修复了异常。
监听死信队列代码DeadQueueConsumer:
public class DeadQueueConsumer {
@Autowired
private OrdinaryPublisher ordinaryPublisher;
//为方便演示,写死在这里,实际可以用配置中心apollo或者阿里云naocs动态刷新该值,即修复bug之后刷新该值为true
private Boolean dynamicRepairSign = false;
//可以注释掉监听,在rabbitmq管理后台取出该消息,等到异常处理完之后把该消息丢回原先的队列进行处理。
@RabbitListener(queues = RabbitMqConstants.DEAD_QUEUE, containerFactory = "singleListenerContainerManual")
public void consumeMsg(Order order, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) Long tag) throws IOException {
log.info("死信队列监听到消息:{}", order);
if (dynamicRepairSign) {
//修复完异常之后发送消息到原先队列进行消费
ordinaryPublisher.sendMsg(order);
channel.basicAck(tag, false);
} else {
channel.basicReject(tag, true);
}
}
}
这样的话也可以不用像方式一那样去rabbitmq管理后台取出死信队列中的消息,然后再把消息手工发送到原先的队列中,当然死信队列也不是一定要用到,要视实际场景而定。
二、延迟队列
-
延迟队列流程图:
-
延迟队列代码:
前面提到,延迟队列是死信队列的特殊情况,因为消息设置了TTL时间,消息过期了变成死信,继而可以利用该特性来做我们想做的事情,监听死信队列,在死信队列中对死信消息做业务处理,所以也称之为延迟队列。
由于其特性,消息在某段时间过后进行消费处理,延迟队列在实际开发中应用很广泛,比如12306或者其他电商等平台,下单之后,30分钟内如果未付款,那么自动取消该订单并且释放相应的库存,类似于这样的场景,用延迟队列是个很好的选择,可能有人会说,那么也可以通过用定时器的方式来实现该功能,但是定时器是轮询地去查数据库,如果在订单量很大或者像12306这样,每隔一段时间去查询一次数据库,还有多少订单未付款并且到了30分钟,这样的话会给系统数据库造成很大的压力,有可能还会压垮系统奔溃掉。
死信队列是在普通的队列中新增两个附加参数,即死信交换机和死信路由,那么延迟队列其实实现起来也很简单,由于消息过期不消费也会变成死信,那么在发送消息的时候设置消息过期时间,同时不对该普通队列进行监听消费,那么该消息不就一定会过期变成死信消息了,继而最后消息被转向了延迟队列中。
(1)首先是创建正常的队列以及在这个队列中绑定特殊的队列的交换机以及路由信息,像死信队列一样在创建队列的过程中增加附加参数x-dead-letter-exchange、x-dead-letter-routing-key,死信交换机和死信路由,另外为了实现延迟队列,需要再增加额外的参数,消息过期时间TTL,x-message-ttl参数,最后死信交换机和死信路由通过绑定关系绑定延迟队列,结合消息的TTL达到延迟的消费的作用。
- RabbitmqConfig配置类创建普通队列和延迟队列,普通队列的消息设置了TTL时间为30s
@Slf4j
@Configuration
public class RabbitmqConfig {
//延迟队列
@Bean
public Queue delayQueuePre() {
//创建延迟队列的组成成分map,用于存放组成成分的相关成员
Map<String, Object> args = new <String, Object>HashMap(16);
//设置消息过期之后的死信交换机(真正消费的交换机)
args.put("x-dead-letter-exchange", RabbitMqConstants.DELAY_EXCHANGE);
//设置消息过期之后死信队列的路由(真正消费的路由)
args.put("x-dead-letter-routing-key", RabbitMqConstants.DELAY_ROUTING_KEY);
//设定消息的TTL,单位为ms,在这里指的是30s
args.put("x-message-ttl", 30000);
return new Queue(RabbitMqConstants.DELAY_QUEUE_PRE, true,false,false, args);
}
//直连传输directExchange消息模型-交换机
@Bean
public DirectExchange delayExchangePre() {
return new DirectExchange(RabbitMqConstants.DELAY_EXCHANGE_PRE, true, false);
}
//直连传输directExchange消息模型-路由交换机绑定队列
@Bean
public Binding delayBindingPre() {
return BindingBuilder.bind(delayQueuePre()).to(delayExchangePre()).with(RabbitMqConstants.DELAY_ROUTING_KEY_PRE);
}
//延迟队列(真正处理消息的队列)
@Bean
public Queue delayQueue() {
return new Queue(RabbitMqConstants.DELAY_QUEUE, true);
}
//死信交换机(真正处理消息的交换机)
@Bean
public DirectExchange delayExchange() {
return new DirectExchange(RabbitMqConstants.DELAY_EXCHANGE, true, false);
}
//死信交换机、死信路由绑定延迟队列
@Bean
public Binding delayBinding() {
return BindingBuilder.bind(delayQueue()).to(delayExchange()).with(RabbitMqConstants.DELAY_ROUTING_KEY);
}
}
- RabbitMqConstants常量值
@Data
public class RabbitMqConstants {
//演示延迟队列,为directExchange消息模型队列绑定延迟队列
public static final String DELAY_QUEUE_PRE = "mq.direct.queue.delay.pre";
public static final String DELAY_EXCHANGE_PRE = "mq.direct.exchange.delay.pre";
public static final String DELAY_ROUTING_KEY_PRE = "mq.routing.key.delay.pre";
//延迟队列
public static final String DELAY_QUEUE = "mq.delay.queue";
public static final String DELAY_EXCHANGE = "mq.delay.exchange";
public static final String DELAY_ROUTING_KEY = "mq.delay.routing.key";
}
(2)创建完队列之后,启动项目,访问http://127.0.0.1:15672/,查看rabbitmq管理后台
(3)发送消息到普通队列中,并且不对该队列进行监听消费消息,让该消息自动达到过期时间转向延迟队列中
- 普通队列-生产者-DelayQueuePrePublisher
public class DelayQueuePrePublisher {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 发送消息
*/
public void sendMsg(Order order) {
try {
//设置延迟队列交换机、延迟队列路由键,消息实体并且发送消息
rabbitTemplate.convertAndSend(RabbitMqConstants.DELAY_EXCHANGE_PRE, RabbitMqConstants.DELAY_ROUTING_KEY_PRE, order);
log.info("延迟队列消息发送成功,消息:{},发送时间:{}", order, LocalDateTime.now());
} catch (Exception e) {
log.error("延迟队列消息发送异常,消息:{},异常e:", order, e);
}
}
}
(4)监听延迟队列,当普通队列中的消息30秒过期了之后变成死信消息,会转向被该队列监听到
- 延迟队列-消费者-DelayQueueConsumer
public class DelayQueueConsumer {
@RabbitListener(queues = RabbitMqConstants.DELAY_QUEUE, containerFactory = "singleListenerContainerAuto")
public void consumeMsg(Order order) {
try {
log.info("延迟队列-30s时间到达后,真正消费消息的队列,监听消息:{},当前时间:{}", order, LocalDateTime.now());
} catch (Exception e) {
log.error("延迟队列-30s时间到达后,真正消费消息的队列,监听消息:{},处理发生异常e:", order, e);
}
}
}
(5)运行test方法:
@Test
public void testDelayPublish() {
Order order = new Order();
order.setOrdernum("1234567");
delayQueuePrePublisher.sendMsg(order);
}
从打印日志可以看出,发送消息时间是:2021-03-02 01:25:14.976,消费消息时间是:2021-03-02 01:25:44.992,相差是30秒(4毫秒误差就不计啦),从而实现了延迟队列的功能。
参考资料:
《rabbitmq实战指南》
《分布式中间件实战》