场景
存在待处理事项,3分钟内未处理转至部门负责人。 5分钟部分负责人未处理则关单。处理后30分钟内未回复则提醒。
最初解决方案
使用RabbitMQ TTL + 死信队列实现.
定义一个延时队列,给该队列发送的消息都设置expiration属性。该队列会自动转发至最终的一个队列。
代码示例
// RabbitMqConfig 定义相关的queue和exchange
@Configuration
public class RabbitMQConfig {
@Bean
public DirectExchange directExchange() {
return new DirectExchange("direct_test_exchange" false);
}
@Bean
public Queue expireQueue(){
return new Queue("expire_queue");
}
@Bean
public Binding expireQueueBindDirectBinding() {
return BindingBuilder.bind(expireQueue()).to(directExchange()).with("expire.queue.key");
}
@Bean
public Queue delayQueue() {
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", "direct_test_exchange");
arguments.put("x-dead-letter-routing-key", "expire.queue.key");
return new Queue("delay_queue", true, false, false, arguments);
}
@Bean
public Binding expireDelayQueueBindDirectBinding() {
return BindingBuilder.bind(delayQueue()).to(directExchange()).with("expire.delay.queue.key");
}
}
//Sender
@Service
public class xxxx {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMsg(String msg, Long expirationTime) {
logger.info("xxxxx");
rabbitTemplate.convertAndSend("direct_test_exchange", "expire.delay.queue.key", message, msg->{
msg.getMessageProperties().setExpiration(String.valueOf(expirationTime));
return msg;
});
}
}
//
public class xxxListener {
@RabbitHandler
@RabbitListener(queues = "expire_queue")
@Async
public void expire(Message message) {
// xxxxx
}
}
原因
如果使用在消息属性上设置TTL的方式,消息可能并不会按时“死亡“,因为RabbitMQ只会检查第一个消息是否过期,如果过期则丢到死信队列,所以如果第一个消息的延时时长很长,而第二个消息的延时时长很短,则第二个消息并不会优先得到执行。
https://www.cnblogs.com/mfrank/p/11260355.html
解决方法
- 使用rabbitmq_delayed_message_exchange插件
- 不同的过期时间用不同的延时队列。