死信队列介绍
- 死信队列:DLX,
dead-letter-exchange
- 利用DLX,当消息在一个队列中变成死信
(dead message)
之后,它能被重新publish到另一个Exchange,这个Exchange就是DLX
消息变成死信有以下几种情况
- 消息被拒绝(basic.reject / basic.nack),并且requeue = false
- 消息TTL过期
- 队列达到最大长度
死信处理过程
- DLX也是一个正常的Exchange,和一般的Exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。
- 当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange上去,进而被路由到另一个队列。
- 可以监听这个队列中的消息做相应的处理。
死信队列设置
- 首先需要设置死信队列的exchange和queue,然后进行绑定:
Exchange: dlx.exchange
Queue: dlx.queue
RoutingKey: #
#表示只要有消息到达了Exchange,那么都会路由到这个queue上
- 然后需要有一个监听,去监听这个队列进行处理
- 然后我们进行正常声明交换机、队列、绑定,只不过我们需要在队列加上一个参数即可:
arguments.put(" x-dead-letter-exchange","dlx.exchange");
,这样消息在过期、requeue、 队列在达到最大长度时,消息就可以直接路由到死信队列!
死信队列演示
生产端
public class Producer {
public static void main(String[] args) throws Exception {
//1 创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.43.157");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2 获取Connection
Connection connection = connectionFactory.newConnection();
//3 通过Connection创建一个新的Channel
Channel channel = connection.createChannel();
String exchange = "test_dlx_exchange";
String routingKey = "dlx.save";
String msg = "Hello RabbitMQ DLX Message";
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.deliveryMode(2)
.contentEncoding("UTF-8")
.expiration("10000")
.build();
//发送消息
channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes());
}
}
自定义消费者
public class MyConsumer extends DefaultConsumer {
public MyConsumer(Channel channel) {
super(channel);
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.err.println("-----------consume message----------");
System.err.println("consumerTag: " + consumerTag);
System.err.println("envelope: " + envelope);
System.err.println("properties: " + properties);
System.err.println("body: " + new String(body));
}
}
消费端
- 声明正常处理消息的交换机、队列及绑定规则
- 在正常交换机上指定死信发送的Exchange
- 声明死信交换机、队列及绑定规则
- 监听死信队列,进行后续处理,这里省略
public class Consumer {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.43.157");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
// 声明一个普通的交换机 和 队列 以及路由
String exchangeName = "test_dlx_exchange";
String routingKey = "dlx.#";
String queueName = "test_dlx_queue";
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
//指定死信发送的Exchange
Map<String, Object> agruments = new HashMap<String, Object>();
agruments.put("x-dead-letter-exchange", "dlx.exchange");
//这个agruments属性,要设置到声明队列上
channel.queueDeclare(queueName, true, false, false, agruments);
channel.queueBind(queueName, exchangeName, routingKey);
//要进行死信队列的声明
channel.exchangeDeclare("dlx.exchange", "topic", true, false, null);
channel.queueDeclare("dlx.queue", true, false, false, null);
channel.queueBind("dlx.queue", "dlx.exchange", "#");
channel.basicConsume(queueName, true, new MyConsumer(channel));
}
}
运行说明
启动消费端,此时查看管控台,新增了两个Exchange,两个Queue。在test_dlx_queue
上我们设置了DLX,也就代表死信消息会发送到指定的Exchange上,最终其实会路由到dlx.queue
上。
此时关闭消费端,然后启动生产端,查看管控台队列的消息情况,test_dlx_queue
的值为1,而dlx_queue
的值为0。
10s后的队列结果如图,由于生产端发送消息时指定了消息的过期时间为10s,而此时没有消费端进行消费,消息便被路由到死信队列中。
实际环境我们还需要对死信队列进行一个监听和处理,当然具体的处理逻辑和业务相关,这里只是简单演示死信队列是否生效。