基于RabbitMq的定时任务工具

前言

工作中接到一个需求,在创建订单时间后的5分钟需要给用户发送一次消息提醒,该消息提醒送达时机偏差不能太大。实际上就是一个定时任务的需求。

方案有几种:

一、定时器扫描:XXL-JOB、QUAZRT等;

二、Redis过期key回调

三、RabbitMq死信队列发送延时消息

方案一、定时器扫描

这是最最简单的方案,有两种实现方式。

1.根据定时器设置的执行时机,执行sql对数据表中某时间字段与当前时间的值进行比较,获取到过期或到期的所有记录,对记录进行批量操作。

2.在新增记录时,设置一个与记录关联的redis过期key,根据定时器设置的执行时机,获取过期的key,进而获取到过期或到期的所有记录,对记录进行批量操作。

优点:实现逻辑简单,后期业务调整方便

缺点:

1.设定执行间隔长:定时器设定的执行间隔过长,会导致执行业务的时机有较大的偏差,如果业务的数据量大时,间隔时间过大会导致任务量堆叠,每次执行都会占用较大的系统资源。

2.设定执行间隔短:占用系统的时机更频繁了,也会占用系统的资源。

方案二、Redis过期key回调

设想:

在新增记录时,设置一个与记录关联的redis过期key,redis在key过期时回调应用,应用中设置的监听器能收到回调消息,对该过期记录进行操作。

实际:

开启redis回调机制需要修改redis配置文件中的notify-keyspace-events参数

notify-keyspace-events的枚举值:

1302061-20190405184708742-1601460759.png

输入的参数中至少要有一个 K 或者 E,否则的话,不管其余的参数是什么,都不会有任何通知被分发。

如: notify-keyspace-events "Ex" 表示对过期事件进行通知发送; notify-keyspace-events "kx" 表示想监控某个 key 的失效事件。将参数设为字符串 AKE 表示发送所有类型的通知。

回调的执行时机:key过期并被redis中的设置过期删除策略删除后才会回调。

经过开发和自测,回调执行的时间和设定的key过期时机有很大偏差。

redis的过期策略

redis中key的六大淘汰机制
noeviction:当内存使用达到阈值的时候,执行命令直接报错
allkeys-lru:在所有的key中,优先移除最近未使用的key。(推荐)
volatile-lru:在设置了过期时间的键空间中,优先移除最近未使用的key。
allkeys-random:在所有的key中,随机移除某个key。
volatile-random:在设置了过期时间的键空间中,随机移除某个key。
volatile-ttl:在设置了过期时间的键空间中,具有更早过期时间的key优先移除。

redis.conf中配置maxmemory <bytes> 比如:maxmemory 300mb开始执行淘汰
redis.conf中配置maxmemory-policy 比如:maxmemory-policy volatile-lru</pre>

方案三、RabbitMq死信队列发送延时消息

原理:

RabbitMq过期消息转移到死信队列特性

aHR0cDovL2RsMi5pdGV5ZS5jb20vdXBsb2FkL2F0dGFjaG1lbnQvMDEzMC84NjM4LzRjZjU0NTc0LTM5YjUtM2M0My04NGRkLWM1ZGEzOTYzMjI0Ni5wbmc.png

总结:正常队列绑定一个死信队列,不设置正常队列消费者。发送方发送消息到正常队列,消息过期后被转移到死信队列,死信队列的消费者就会消息过期的消息。

上代码:

交换机配置类

@Configuration
public class DelayExchangeConfig {
 /**
 *缩略了ConnectionFactory的配置,请自行配置
 */

 //创建一个正常交换机bean
 @Bean
 DirectExchange initExchange() {
     return new DirectExchange(RabbitmqConfig.Exchange.DELAY_INIT_EXCHANGE);
 }

 //创建一个死信交换机bean
 @Bean
 DirectExchange delayExchange() {
     return new DirectExchange(RabbitmqConfig.Exchange.DELAY_EXCHANGE);
 }

 //创建一个rabbitAdmin,类似控制台的功能
 @Bean
 public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
     return new RabbitAdmin(connectionFactory);
 }

}

消息发送方基类

public abstract class BaseDelayProducer {
 Queue initQueue = null;
 Queue delayQueue = null;
 DirectExchange initExchange = null;
 DirectExchange delayExchange = null;
 ApplicationContext context = null;
 AmqpTemplate amqpTemplate = null;

 void setApplicationContext(ApplicationContext context){
     this.context = context;
 }

 void setAmqpTemplate(AmqpTemplate amqpTemplate){
     this.amqpTemplate = amqpTemplate;
 }

 void setInitExchange(ApplicationContext context){
     DirectExchange initExchange = (DirectExchange) context.getBean("initExchange");
     this.initExchange = initExchange;
 }

 void setDelayExchange(ApplicationContext context){
     DirectExchange delayExchange = (DirectExchange) context.getBean("delayExchange");
     this.delayExchange = delayExchange;
 }

 void setInitQueue(String initQueueName,String delayQueueName,int delayMillisecond){
     String delayExchangeName = this.delayExchange.getName();
     this.initQueue = QueueBuilder.durable(initQueueName)
             .withArgument("x-dead-letter-exchange", delayExchangeName) // DLX
             .withArgument("x-dead-letter-routing-key", delayQueueName) // dead letter携带的routing key
             .withArgument("x-message-ttl", delayMillisecond) // 设置队列的过期时间
             .build();

 }

 void setDelayQueue(String delayQueueName){
      this.delayQueue = QueueBuilder.durable(delayQueueName).build();
 }

 //初始化发送者
 /**
 * 
 * @param rabbitAdmin rabbitmq管理控制台
 * @param initQueueName 正常队列名
 * @param delayQueueName 死信队列名
 * @param delayMillisecond 延时时间,单位:毫秒
 * @param context ApplicationContext对象
 * @param amqpTemplate AmqpTemplate rabbitmq客户端
 */
 protected void initBinding(RabbitAdmin rabbitAdmin
                     , String initQueueName
                     , String delayQueueName
                     , int delayMillisecond
                     , ApplicationContext context
                     , AmqpTemplate amqpTemplate){
          setApplicationContext(context);
          setAmqpTemplate(amqpTemplate);
          setDelayExchange(context);
          setInitExchange(context);
          setDelayQueue(delayQueueName);
          setInitQueue(initQueueName, delayQueueName, delayMillisecond);
          Binding delayBind = BindingBuilder.bind(this.delayQueue)
                    .to(this.delayExchange)
                    .with(this.delayQueue.getName());

          Binding initBind = BindingBuilder.bind(this.initQueue)
                    .to(this.initExchange)
                    .with(this.initQueue.getName());

          rabbitAdmin.declareQueue(initQueue);
          rabbitAdmin.declareQueue(delayQueue);
          rabbitAdmin.declareExchange(initExchange);
          rabbitAdmin.declareExchange(delayExchange);
          rabbitAdmin.declareBinding(delayBind);
          rabbitAdmin.declareBinding(initBind);
 }

 //发送延时消息方法
 public void sendMessge(Object message){
           this.amqpTemplate.convertAndSend(this.initExchange.getName(),this.initQueue.getName(),message);
 }

}

消费方实现类(不同类型的定时任务都需要配置不同的实现类)

@Service
public class OrderDelayProducer extends BaseDelayProducer {
 @Resource
 private AmqpTemplate amqpTemplate;
 @Resource
 private ApplicationContext context;
 @Resource
 private RabbitAdmin rabbitAdmin;

 //必须在Spring初始化消费方实现类后调用initBinding方法进行初始化配置
 @PostConstruct
 private void init() {
       initBinding(rabbitAdmin,"test_order_init","test_order_delay",20000,context,amqpTemplate);
 }

}

调用发送延时消息方法

@RestController
public class TestSendMessage {
 @Resource
 private OrderDelayProducer orderDelayProducer;

 @GetMapping("/send")
 public void send(@RequestParam("content")String content){
       orderDelayProducer.sendMessge(content);
 }

 //过20秒才会回调
 @RabbitListener(queues = "test_order_delay")
 public void handleMessage(byte[] message){
       System.out.println("消费消息");
       System.out.println(new String(message));
 }

}

优点:

时效性强,rabbitmq的时效性为us级。

可用性强,基于主从结构的rabbitmq可以有多个节点顶替坏节点

吞吐量大(相对于定时任务):单机吞吐量为万级

缺点:

依赖网络带宽性能,网络带宽的性能影响消息的传输时效性,所以建议mq服务器和应用服务器部署在同一内网,在内网进行数据交互。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,875评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,569评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,475评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,459评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,537评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,563评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,580评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,326评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,773评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,086评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,252评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,921评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,566评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,190评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,435评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,129评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,125评论 2 352

推荐阅读更多精彩内容