简介
死信队列并不是一种特殊的队列,和普通的队列的定义并没有任何的不同,之所以称之为死信队列,是因为我们很少直接往死信队列存放消息,而是通过其他队列的参数设定,在消息过期、多次尝试但未成功消费后,由RabbitMQ来完成消息的转存。其流程图如下:
- 将队列
dead_letter_queue
使用路由键dead_letter_routing_key
绑定到dead_letter_exchange
这个交换机上。 - 将队列
msg-queue
使用路由键msg
绑定到msg-exchange
绑定到msg-exchange
这个交换机上。 -
msg-queue
的x-dead-letter-exchange
参数是死信队列的交换机;x-dead-letter-routing-key
是死信队列路由键;x-message-ttl
是消息的最大存活时间。当msg-queue
中的某条消息超过了存活的时间,就会通过dead_letter_exchange
这个交换机,发送到路由键为dead_letter_routing_key
对应的队列中。
代码实现
@Slf4j
@Component
public class DeadQueue {
// 死信队列名
private static final String DEAD_LETTER_QUEUE = "dead_letter_queue";
// 死信交换机
private static final String DEAD_LETTER_EXCHANGE = "dead_letter_exchange";
// 死信路由键
private static final String DEAD_LETTER_ROUTING_KEY = "dead_letter_routing_key";
private RabbitAdmin rabbitAdmin;
public DeadQueue(RabbitAdmin rabbitAdmin) {
this.rabbitAdmin = rabbitAdmin;
}
@PostConstruct
public void initDeadQueue() {
Queue deadQueue = QueueBuilder.durable(DEAD_LETTER_QUEUE).build();
rabbitAdmin.declareQueue(deadQueue); //创建死信队列
Exchange deadExchange = ExchangeBuilder.directExchange(DEAD_LETTER_EXCHANGE)
.durable(true).build();
rabbitAdmin.declareExchange(deadExchange); //创建死信交换机
Binding binding = BindingBuilder.bind(deadQueue).to(deadExchange)
.with(DEAD_LETTER_ROUTING_KEY).noargs();
rabbitAdmin.declareBinding(binding); // 将队列绑定到交换机上
log.info("死信队列:{}, 死信交换机: {}, 已经成功绑定.", DEAD_LETTER_QUEUE, DEAD_LETTER_EXCHANGE);
}
/**
* x-dead-letter-exchange: 死信队列交换机
* x-dead-letter-routing-key: 死信队列路由键
* x-message-ttl: 消息在队列中最大的存活时间, 如果没有被消费,就会进入到死信队列
*/
@RabbitListener(bindings = @QueueBinding(
value = @org.springframework.amqp.rabbit.annotation.Queue(
value = "msg-queue",
durable = "true",
arguments = {@Argument(name = "x-dead-letter-exchange", value = DEAD_LETTER_EXCHANGE),
@Argument(name = "x-dead-letter-routing-key", value = DEAD_LETTER_ROUTING_KEY),
@Argument(name = "x-message-ttl", value = "20000", type = "java.lang.Long")
}
),
exchange = @org.springframework.amqp.rabbit.annotation.Exchange(name = "msg-exchange"),
key = {"msg"}
))
public void receiveMsg(Message message, Channel channel) throws Exception{
log.info("消息体:{}", new String(message.getBody()));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
RabbitAdmin 的注入
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
}
测试
程序启动后然后关闭(目的是创建队列),将 DeadQueue
中 receiveMsg
对应的方法注释。然后运行如下代码:
@Test
public void sendMsg2() {
// 消息属性
MessageProperties messageProperties = MessagePropertiesBuilder
.newInstance().setMessageId(UUID.randomUUID().toString())
.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
.setExpiration("5000")
.build();
// 消息
Message message = new Message(JSONObject.toJSONBytes(new User(23, "张三")), messageProperties);
CorrelationData data = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend("msg-exchange","msg", message, data);
try {
TimeUnit.SECONDS.sleep(120);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
当同时指定了队列的
x-message-ttl
和消息的expiration
的值的时候,取他们中最小的值为超时时间,超出后会发送到死信队列。