前言
工作中接到一个需求,在创建订单时间后的5分钟需要给用户发送一次消息提醒,该消息提醒送达时机偏差不能太大。实际上就是一个定时任务的需求。
方案有几种:
一、定时器扫描:XXL-JOB、QUAZRT等;
二、Redis过期key回调
三、RabbitMq死信队列发送延时消息
方案一、定时器扫描
这是最最简单的方案,有两种实现方式。
1.根据定时器设置的执行时机,执行sql对数据表中某时间字段与当前时间的值进行比较,获取到过期或到期的所有记录,对记录进行批量操作。
2.在新增记录时,设置一个与记录关联的redis过期key,根据定时器设置的执行时机,获取过期的key,进而获取到过期或到期的所有记录,对记录进行批量操作。
优点:实现逻辑简单,后期业务调整方便
缺点:
1.设定执行间隔长:定时器设定的执行间隔过长,会导致执行业务的时机有较大的偏差,如果业务的数据量大时,间隔时间过大会导致任务量堆叠,每次执行都会占用较大的系统资源。
2.设定执行间隔短:占用系统的时机更频繁了,也会占用系统的资源。
方案二、Redis过期key回调
设想:
在新增记录时,设置一个与记录关联的redis过期key,redis在key过期时回调应用,应用中设置的监听器能收到回调消息,对该过期记录进行操作。
实际:
开启redis回调机制需要修改redis配置文件中的notify-keyspace-events参数
notify-keyspace-events的枚举值:
输入的参数中至少要有一个 K 或者 E,否则的话,不管其余的参数是什么,都不会有任何通知被分发。
如: notify-keyspace-events "Ex" 表示对过期事件进行通知发送; notify-keyspace-events "kx" 表示想监控某个 key 的失效事件。将参数设为字符串 AKE 表示发送所有类型的通知。
回调的执行时机:key过期并被redis中的设置过期删除策略删除后才会回调。
经过开发和自测,回调执行的时间和设定的key过期时机有很大偏差。
redis的过期策略
redis中key的六大淘汰机制
noeviction:当内存使用达到阈值的时候,执行命令直接报错
allkeys-lru:在所有的key中,优先移除最近未使用的key。(推荐)
volatile-lru:在设置了过期时间的键空间中,优先移除最近未使用的key。
allkeys-random:在所有的key中,随机移除某个key。
volatile-random:在设置了过期时间的键空间中,随机移除某个key。
volatile-ttl:在设置了过期时间的键空间中,具有更早过期时间的key优先移除。
redis.conf中配置maxmemory <bytes> 比如:maxmemory 300mb开始执行淘汰
redis.conf中配置maxmemory-policy 比如:maxmemory-policy volatile-lru</pre>
方案三、RabbitMq死信队列发送延时消息
原理:
RabbitMq过期消息转移到死信队列特性
总结:正常队列绑定一个死信队列,不设置正常队列消费者。发送方发送消息到正常队列,消息过期后被转移到死信队列,死信队列的消费者就会消息过期的消息。
上代码:
交换机配置类
@Configuration
public class DelayExchangeConfig {
/**
*缩略了ConnectionFactory的配置,请自行配置
*/
//创建一个正常交换机bean
@Bean
DirectExchange initExchange() {
return new DirectExchange(RabbitmqConfig.Exchange.DELAY_INIT_EXCHANGE);
}
//创建一个死信交换机bean
@Bean
DirectExchange delayExchange() {
return new DirectExchange(RabbitmqConfig.Exchange.DELAY_EXCHANGE);
}
//创建一个rabbitAdmin,类似控制台的功能
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
return new RabbitAdmin(connectionFactory);
}
}
消息发送方基类
public abstract class BaseDelayProducer {
Queue initQueue = null;
Queue delayQueue = null;
DirectExchange initExchange = null;
DirectExchange delayExchange = null;
ApplicationContext context = null;
AmqpTemplate amqpTemplate = null;
void setApplicationContext(ApplicationContext context){
this.context = context;
}
void setAmqpTemplate(AmqpTemplate amqpTemplate){
this.amqpTemplate = amqpTemplate;
}
void setInitExchange(ApplicationContext context){
DirectExchange initExchange = (DirectExchange) context.getBean("initExchange");
this.initExchange = initExchange;
}
void setDelayExchange(ApplicationContext context){
DirectExchange delayExchange = (DirectExchange) context.getBean("delayExchange");
this.delayExchange = delayExchange;
}
void setInitQueue(String initQueueName,String delayQueueName,int delayMillisecond){
String delayExchangeName = this.delayExchange.getName();
this.initQueue = QueueBuilder.durable(initQueueName)
.withArgument("x-dead-letter-exchange", delayExchangeName) // DLX
.withArgument("x-dead-letter-routing-key", delayQueueName) // dead letter携带的routing key
.withArgument("x-message-ttl", delayMillisecond) // 设置队列的过期时间
.build();
}
void setDelayQueue(String delayQueueName){
this.delayQueue = QueueBuilder.durable(delayQueueName).build();
}
//初始化发送者
/**
*
* @param rabbitAdmin rabbitmq管理控制台
* @param initQueueName 正常队列名
* @param delayQueueName 死信队列名
* @param delayMillisecond 延时时间,单位:毫秒
* @param context ApplicationContext对象
* @param amqpTemplate AmqpTemplate rabbitmq客户端
*/
protected void initBinding(RabbitAdmin rabbitAdmin
, String initQueueName
, String delayQueueName
, int delayMillisecond
, ApplicationContext context
, AmqpTemplate amqpTemplate){
setApplicationContext(context);
setAmqpTemplate(amqpTemplate);
setDelayExchange(context);
setInitExchange(context);
setDelayQueue(delayQueueName);
setInitQueue(initQueueName, delayQueueName, delayMillisecond);
Binding delayBind = BindingBuilder.bind(this.delayQueue)
.to(this.delayExchange)
.with(this.delayQueue.getName());
Binding initBind = BindingBuilder.bind(this.initQueue)
.to(this.initExchange)
.with(this.initQueue.getName());
rabbitAdmin.declareQueue(initQueue);
rabbitAdmin.declareQueue(delayQueue);
rabbitAdmin.declareExchange(initExchange);
rabbitAdmin.declareExchange(delayExchange);
rabbitAdmin.declareBinding(delayBind);
rabbitAdmin.declareBinding(initBind);
}
//发送延时消息方法
public void sendMessge(Object message){
this.amqpTemplate.convertAndSend(this.initExchange.getName(),this.initQueue.getName(),message);
}
}
消费方实现类(不同类型的定时任务都需要配置不同的实现类)
@Service
public class OrderDelayProducer extends BaseDelayProducer {
@Resource
private AmqpTemplate amqpTemplate;
@Resource
private ApplicationContext context;
@Resource
private RabbitAdmin rabbitAdmin;
//必须在Spring初始化消费方实现类后调用initBinding方法进行初始化配置
@PostConstruct
private void init() {
initBinding(rabbitAdmin,"test_order_init","test_order_delay",20000,context,amqpTemplate);
}
}
调用发送延时消息方法
@RestController
public class TestSendMessage {
@Resource
private OrderDelayProducer orderDelayProducer;
@GetMapping("/send")
public void send(@RequestParam("content")String content){
orderDelayProducer.sendMessge(content);
}
//过20秒才会回调
@RabbitListener(queues = "test_order_delay")
public void handleMessage(byte[] message){
System.out.println("消费消息");
System.out.println(new String(message));
}
}
优点:
时效性强,rabbitmq的时效性为us级。
可用性强,基于主从结构的rabbitmq可以有多个节点顶替坏节点
吞吐量大(相对于定时任务):单机吞吐量为万级
缺点:
依赖网络带宽性能,网络带宽的性能影响消息的传输时效性,所以建议mq服务器和应用服务器部署在同一内网,在内网进行数据交互。