1.什么是死信(Dead Letter)?
官方文档:https://www.rabbitmq.com/dlx.html
一个消息变成死信的条件有:
1.消费被拒绝(basic.reject 或者 basic.nack),并且参数 requeue = false 时
2.消息TTL(存活时间)过期
3.队列达到最大长度
rabbitMQ对于死信消息的处理是:如果配置了死信队列,成为死信的消息会被丢进死信队列,如果没有则被丢弃。
2.配置死信队列
死信队列不是特殊的队列,它只是绑定在了死信交换机上而已,并且死信交换机也不是特殊的交换机,它也只是用来接收死信消息的
一般分为以下步骤:
1.配置业务队列,绑定到业务交换机上
2.为业务队列配置死信交换机和路由
3.配置死信队列,绑定到死信交换机上
模拟其中一种条件来进行实战!
配置类:
@Configuration
public class RabbitMQConfig {
/**
* 业务交换机
*/
public static final String BUSINESS_EXCHANGE_NAME = "business.exchange";
/**
* 业务队列
*/
public static final String BUSINESS_QUEUE_NAME = "business.queue";
/**
* 死信交换机
*/
public static final String DEAD_EXCHANGE_NAME = "dead.exchange";
/**
* 死信队列
*/
public static final String DEAD_QUEUE_NAME = "dead.queue";
public static final String DEAD_ROUTING_KEY = "dead.routing.key";
/**
* 声明业务交换机
**/
@Bean("businessExchange")
public FanoutExchange businessExchange(){
return new FanoutExchange(BUSINESS_EXCHANGE_NAME);
}
/**
* 声明业务队列
*
**/
@Bean("businessQueue")
public Queue businessQueue(){
Map<String, Object> args = new HashMap<>(2);
//声明绑定的死信交换机
args.put("x-dead-letter-exchange", DEAD_EXCHANGE_NAME);
//声明死信路由key
args.put("x-dead-letter-routing-key", DEAD_ROUTING_KEY);
return QueueBuilder.durable(BUSINESS_QUEUE_NAME).withArguments(args).build();
}
/**
* 业务队列绑定到业务交换机
*
**/
@Bean
public Binding businessBind(@Qualifier("businessQueue")Queue businessQueue,
@Qualifier("businessExchange")FanoutExchange businessExchange){
return BindingBuilder.bind(businessQueue).to(businessExchange);
}
/**
* 声明死信交换机
**/
@Bean("deadExchange")
public DirectExchange deadExchange(){
return new DirectExchange(DEAD_EXCHANGE_NAME);
}
/**
* 声明死信队列
*
**/
@Bean("deadQueue")
public Queue deadQueue(){
return new Queue(DEAD_QUEUE_NAME);
}
/**
* 死信队列绑定到死信交换机
*
**/
@Bean
public Binding deadBind(@Qualifier("deadQueue")Queue deadQueue,
@Qualifier("deadExchange")DirectExchange deadExchange){
return BindingBuilder.bind(deadQueue).to(deadExchange).with(DEAD_ROUTING_KEY);
}
}
业务队列监听
@Component
public class BusinessListener {
Logger logger = LoggerFactory.getLogger(getClass());
@RabbitListener(queues = RabbitMQConfig.BUSINESS_QUEUE_NAME)
public void receive(Message message, Channel channel) throws Exception{
String msg = new String(message.getBody());
logger.info("收到消息:" + msg);
if (msg.contains("dead")){
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
}else {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
}
死信队列监听
@Component
public class DeadListener {
Logger logger = LoggerFactory.getLogger(getClass());
@RabbitListener(queues = RabbitMQConfig.DEAD_QUEUE_NAME)
public void receive(Message message, Channel channel) throws Exception{
String msg = new String(message.getBody());
logger.info("收到消息:" + msg);
}
}
写一个controller来生产消息,方便测试
@RestController
@RequestMapping("my")
public class MyController {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostMapping("send")
public void send(@RequestParam("msg")String msg){
rabbitTemplate.convertSendAndReceive(RabbitMQConfig.BUSINESS_EXCHANGE_NAME,"", msg);
}
}
发送一个消息
结果
发送一个死信消息
结果