官网地址:https://www.rabbitmq.com/ttl.html
TTL(Time To Live)代表的是一条消息或者该队列中的所有消息的最大存活时间,如果再这段时间内消息没有被消息,则该消息会成为死信。
1.如何设置TTL
RabbitMQ允许给消息或者队列设置TTL,如果一个设置了TTL的消息进入了一个设置了TTL的队列,则最小的TTL生效。
创建队列的时候设置TTL:
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-message-ttl", 60000);
channel.queueDeclare("myqueue", false, false, false, args);
这样所有被投递到该队列的消息都最多不会存活超过6s
对每一条消息设置TTL:
byte[] messageBodyBytes = "Hello, world!".getBytes();
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.expiration("60000")
.build();
channel.basicPublish("my-exchange", "routing-key", properties, messageBodyBytes);
这样这条消息的过期时间也被设置成了6s
在使用第二种方式设置TTL的时候需要注意,过期的消息可能排在未过期的消息后面,即消息过期不会被立马丢弃,只有当消息准备投递到消费者之前才会判定为过期,例如,消息A比消息B先进了消息队列,消息B的过期时间为5S,在消息B的存活时间内,消息A一直还未被消费,即使5S过去了消息B已经过期,但它仍然在队列中,直到消息A被消费。
2.实战!
队列设置ttl
配置类:
@Configuration
public class TTLMQConfig {
public static final String TTL_QUEUE_NAME = "ttl.queue";
public static final String TTL_EXCHANGE_NAME = "ttl.exchange";
@Bean("ttlQueue")
public Queue ttlQueue(){
Map<String, Object> args = new HashMap<>(2);
args.put("x-message-ttl", 30000);
return QueueBuilder.durable(TTL_QUEUE_NAME).withArguments(args).build();
}
@Bean("ttlExchange")
public FanoutExchange ttlExchange(){
return new FanoutExchange(TTL_EXCHANGE_NAME);
}
@Bean
public Binding ttlBind(@Qualifier("ttlQueue")Queue ttlQueue,
@Qualifier("ttlExchange")FanoutExchange ttlExchange){
return BindingBuilder.bind(ttlQueue).to(ttlExchange);
}
}
同样也是写一个controller来生产消息
@PostMapping("ttlSend")
public void ttlSend(@RequestParam("msg")String msg){
rabbitTemplate.convertSendAndReceive(TTLMQConfig.TTL_EXCHANGE_NAME,"", msg);
}
发送了一条消息,在30S之后被丢弃了
再来一条
消息设置ttl
同样是往设置了30Sttl的队列中发送消息,设置消息的ttl为10S
@PostMapping("ttlMsgSend")
public void ttlMsgSend(@RequestParam("msg")String msg){
rabbitTemplate.convertAndSend(TTLMQConfig.TTL_EXCHANGE_NAME,"", msg, message -> {
message.getMessageProperties().setExpiration("10000");
return message;
});
}
可以看到消息在10S后自动消失了,同时也验证了上面说到的最小的TTL生效。