延时队列-基于RabbitMq延时消息插件实现的延时队列

前言

定时调度基本是每个项目都会遇到的业务场景,一般地,都会通过任务调度工具执行定时任务完成,定时任务有两点缺陷,一、定时任务执行频度限制,实际执行的时间可能会晚于理想的设定时间,例如,如果要通过定时任务实现在下单后15分钟仍未支付则取消订单的功能,假设定时任务的执行频度为每分钟执行一次,对于有些订单而言,其实际取消时间是介于15-16分钟之间,不够精确;二、定时任务执行需要时间,定时任务的执行也需要时间,如果业务场景的数据量较大,执行一次定时任务需要足够长的时间,进一步放大了缺点一。

RabbitMq的延时队列

RabbitMq作为一种常用的消息中间件,其本身就支持延迟队列和延迟消息,可以结合死信交换机、实现定时调度的功能。
一、为队列指定TTL时间,当消息进入队列如果经过了TTL时间,则该消息会进入到死信交换机中并路由到死信队列中,对死信队列进行监听则可以完成后续业务处理;例如在订单超时未支付这个业务场景下,可以将下单消息放到订单队列中,订单队列指定死信交换机并指定订单队列的TTL时间为15分钟,对死信队列进行监听,如果订单仍未支付则取消订单;为队列指定TTL时间适合某一动作触发后相同的时间间隔后再触发另一个动作的业务场景。
二、为消息指定TTL时间,队列指定死信交换机,RabbitMq支持为每一个发送的消息单独指定一个TTL时间,但是原生的RabbitMq对每个消息指定TTL时间是有缺陷的,例如在同一个队列中指定依次入队的两个消息message1、message2的TTL时间分别为10分钟和5分钟,对死信队列进行监听,按照理想的情况,在5分钟和10分钟后分别消费了message2和message1,但是实际情况是10分钟后先消费message1随后消费message2,出现这种情况的原因是RabbitMq只会对队列头部的消息进行扫描判断其是否需要进入死信交换机中,只有当队列头部的消息消费后才会对后续消息进行消费,不适合需要按照消息指定TTL时间的业务场景中。

基于RabbitMq延时消息插件实现的延时队列

为RabbitMq安装了延时消息插件rabbitmq_delayed_message_exchange后,就能够为每个消息指定延时时间,并能够按照延时时间进行消费,如何安装延时消息插件可自行百度。
基于插件实现的延时消息实际上是通过交换机完成的,在安装了插件后,会多出一种交换机x-delayed-message,其能够对进入交换机中的所有延时消息进行扫描,当达到了设定的延时时间后再将消息投递到绑定的队列中。

代码部分

先梳理一下业务流程,假设我们现在有两个业务场景:
一、发布了某一个引流活动,每个用户参与时都会生成一个唯一的二维码,二维码需要在活动结束后被立即删除。
二、需要在指定时刻执行某一个任务,任务是可重复执行的,所以理论上来说,在任意时刻任务都有需要被执行的可能。
按照我们在前言中的分析,定时任务显然不适合上述两个业务场景。
梳理一下思路,代码部分主要有一下几个方面:

  • 延时消息,延时消息需要按照业务场景进行分类,延时消息包含有当前业务场景必要的信息、消息类型和延时时间。
  • 延时消息处理器,按照延时消息类型指定延时消息处理器,每个延时消息处理器在接收到消息后完成当前业务场景的逻辑处理。
  • 延时消息管理器,延时消息管理器需要对消息进行监听并能够添加消息和移除消息。

延时消息DelayMessage

package com.cube.share.delay.message;

import com.fasterxml.jackson.annotation.JsonIgnore;
import lombok.*;
import org.springframework.lang.NonNull;
import org.springframework.util.Assert;

import java.io.Serializable;
import java.util.Objects;
import java.util.concurrent.TimeUnit;

/**
 * @author cube.li
 * @date 2021/9/22 15:43
 * @description 延时消息
 */
@Getter
@Setter
@ToString
public class DelayMessage implements Serializable {
    private static final long serialVersionUID = 9006297630420423520L;

    /**
     * 内容
     */
    @NonNull
    private String body;

    /**
     * 消息类型
     */
    @NonNull
    private DelayMessageType type;

    /**
     * 消息属性
     */
    @JsonIgnore
    @NonNull
    private DelayMessageProperties properties;


    @Override
    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || getClass() != o.getClass()) {
            return false;
        }
        DelayMessage that = (DelayMessage) o;
        return Objects.equals(body, that.body) &&
                type == that.type;
    }

    @Override
    public int hashCode() {
        return Objects.hash(body, type);
    }

    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static class DelayMessageProperties implements Serializable {

        private static final long serialVersionUID = 1240631950524432277L;
        /**
         * 过期时间单位
         */
        private TimeUnit timeUnit;

        /**
         * 时长,实际的过期时间为 timeUnit * expire
         */
        private long expire;

    }

    public void check() {
        Assert.notNull(this.body, "delay message must not be null");
        Assert.notNull(this.type, "delay message type must not be null");
        Assert.notNull(this.properties, "delay message properties must not be null");
    }
}

延时消息的类型分类DelayMessageType

package com.cube.share.delay.message;

import com.cube.share.delay.handler.DelayMessageHandler;
import com.cube.share.delay.handler.ExecuteTaskDelayMessageHandler;
import com.cube.share.delay.handler.QrCodeDelayMessageHandler;
import lombok.Getter;

/**
 * @author cube.li
 * @date 2021/9/22 15:44
 * @description 延迟消息类型
 */
@Getter
public enum DelayMessageType {

    DELETE_QR_CODE("删除二维码", QrCodeDelayMessageHandler.class),
    EXECUTE_TASK("执行任务", ExecuteTaskDelayMessageHandler.class);

    private final String desc;

    /**
     * 此延时消息的处理器
     */
    private final Class<? extends DelayMessageHandler> handler;

    DelayMessageType(String desc, Class<? extends DelayMessageHandler> handler) {
        this.desc = desc;
        this.handler = handler;
    }
}

延时消息处理器 DelayMessageHandler

package com.cube.share.delay.handler;

import com.cube.share.delay.message.DelayMessage;

/**
 * @author cube.li
 * @date 2021/9/22 15:32
 * @description 延时消息处理器接口
 */
public interface DelayMessageHandler {

    /**
     * 处理消息
     *
     * @param message 消息
     */
    void handle(DelayMessage message);
}

延时消息管理器DelayMessageManager
这里声明为接口是为了以后拓展,基于redisson也可以实现延时队列,声明为接口在使用时能够在多种实现之间自由切换。

package com.cube.share.delay.manager;

import com.cube.share.delay.message.DelayMessage;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;

/**
 * @author cube.li
 * @date 2021/9/22 16:00
 * @description 延时消息管理器
 */
public interface DelayMessageManager extends InitializingBean, DisposableBean {

    /**
     * 添加延时消息
     *
     * @param message 延时消息
     */
    void add(DelayMessage message);

    /**
     * 移除延时消息
     *
     * @param message 待移除的消息
     * @return 移除成功返回true, 移除失败返回false
     */
    boolean remove(DelayMessage message);
}

RabbitMq延时消息配置类

package com.cube.share.delay.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * @author cube.li
 * @date 2021/9/22 16:16
 * @description 基于rabbitmq实现的延时消息配置
 */
@Configuration
public class RabbitMqDelayMessageConfig {

    public static final String DELAYED_QUEUE_NAME = "dm.delayed.queue";

    public static final String DELAYED_EXCHANGE_NAME = "dm.delayed.exchange";

    public static final String DELAYED_ROUTING_KEY = "dm.delayed.routing.key";

    @Bean
    public CustomExchange delayedExchange() {
        Map<String, Object> args = new HashMap<>(2);
        args.put("x-delayed-type", "direct");
        return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args);
    }

    @Bean
    public Queue delayedQueue() {
        return QueueBuilder.durable(DELAYED_QUEUE_NAME).build();
    }

    @Bean
    public Binding delayedQueueBindingExchange() {
        return BindingBuilder.bind(delayedQueue()).to(delayedExchange()).with(DELAYED_ROUTING_KEY).noargs();
    }
}

基于RabbitMq延时消息插件实现的延时消息管理器RabbitmqDelayMessageManager

package com.cube.share.delay.manager;

import com.cube.share.base.utils.JacksonUtils;
import com.cube.share.base.utils.SpringContextUtil;
import com.cube.share.delay.config.RabbitMqDelayMessageConfig;
import com.cube.share.delay.handler.DelayMessageHandler;
import com.cube.share.delay.message.DelayMessage;
import com.cube.share.delay.message.DelayMessageType;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.DependsOn;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.io.IOException;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

/**
 * @author cube.li
 * @date 2021/9/22 16:03
 * @description 基于rabbitmq实现的延时消息管理器
 */
@Component
@DependsOn({"springContextUtil", "rabbitTemplate"})
@Slf4j
public class RabbitmqDelayMessageManager implements DelayMessageManager {

    private final Map<DelayMessageType, DelayMessageHandler> handlerMap = new ConcurrentHashMap<>(16);

    @Resource
    private RabbitTemplate rabbitTemplate;

    @RabbitListener(queues = RabbitMqDelayMessageConfig.DELAYED_QUEUE_NAME, ackMode = "MANUAL")
    public void receiveMessage(Message message, Channel channel) throws IOException {
        String bodyString = new String(message.getBody());
        DelayMessage delayMessage = JacksonUtils.readJsonString(bodyString, DelayMessage.class);
        log.info("接收到延时消息:{}", delayMessage.toString());
        try {
            handlerMap.get(delayMessage.getType()).handle(delayMessage);
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
        }
    }

    @Override
    public void add(DelayMessage message) {
        message.check();
        rabbitTemplate.convertAndSend(RabbitMqDelayMessageConfig.DELAYED_EXCHANGE_NAME, RabbitMqDelayMessageConfig.DELAYED_ROUTING_KEY, message, msg -> {
            MessageProperties messageProperties = msg.getMessageProperties();
            messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
            long expiration = TimeUnit.MILLISECONDS.convert(message.getProperties().getExpire(), message.getProperties().getTimeUnit());
            messageProperties.setDelay((int) expiration);
            return msg;
        });
    }

    @Override
    public boolean remove(DelayMessage message) {
        //rabbitmq不支持从队列中移除消息
        return false;
    }

    @Override
    public void destroy() {
        //do nothing
    }

    @Override
    public void afterPropertiesSet() {
        Arrays.stream(DelayMessageType.values()).forEach(delayMessageType -> handlerMap.put(delayMessageType, SpringContextUtil.getBean(delayMessageType.getHandler())));
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
    }
}

两个业务场景对应的实体和消息处理器

package com.cube.share.delay.entity;

import lombok.Data;

/**
 * @author cube.li
 * @date 2021/9/22 16:11
 * @description 二维码
 */
@Data
public class QrCode {

    private String url;

    private String configId;
}
package com.cube.share.delay.handler;

import com.cube.share.base.utils.JacksonUtils;
import com.cube.share.delay.entity.QrCode;
import com.cube.share.delay.message.DelayMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

/**
 * @author cube.li
 * @date 2021/9/22 16:08
 * @description 二维码延时消息处理器
 */
@Component
@Slf4j
public class QrCodeDelayMessageHandler implements DelayMessageHandler {

    @Override
    public void handle(DelayMessage message) {
        log.info("二维码延时消息处理中,message = {}", message.toString());
        QrCode qrCode = JacksonUtils.toJavaObject(message.getBody(), QrCode.class);
        //删除二维码
    }
}
package com.cube.share.delay.entity;

import lombok.Data;

/**
 * @author cube.li
 * @date 2021/9/22 18:01
 * @description 任务
 */
@Data
public class ExecuteTask {

    private Long id;
}
package com.cube.share.delay.handler;

import com.cube.share.delay.message.DelayMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

/**
 * @author cube.li
 * @date 2021/9/22 18:02
 * @description 任务消息处理器
 */
@Component
@Slf4j
public class ExecuteTaskDelayMessageHandler implements DelayMessageHandler {

    @Override
    public void handle(DelayMessage message) {
        log.info("任务延时消息处理中,message={}", message);
    }
}
测试

通过单元测试发送几条信息并分别指定延时时间为10s、5s、9s

package com.cube.share.delay.manager;

import com.cube.share.base.utils.JacksonUtils;
import com.cube.share.delay.entity.ExecuteTask;
import com.cube.share.delay.entity.QrCode;
import com.cube.share.delay.message.DelayMessage;
import com.cube.share.delay.message.DelayMessageType;
import com.fasterxml.jackson.core.JsonProcessingException;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;

import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;

/**
 * @author cube.li
 * @date 2021/9/22 17:12
 * @description 测试
 */
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
class DelayMessageManagerTest {

    @Resource(type = RabbitmqDelayMessageManager.class)
    DelayMessageManager delayMessageManager;

    @Test
    void add() throws JsonProcessingException {
        DelayMessage delayMessage = new DelayMessage();
        QrCode qrCode = new QrCode();
        qrCode.setConfigId("fadfdaf110");
        qrCode.setUrl("http://www.baidu.com");
        delayMessage.setBody(JacksonUtils.toJsonString(qrCode));
        delayMessage.setType(DelayMessageType.DELETE_QR_CODE);
        DelayMessage.DelayMessageProperties properties = new DelayMessage.DelayMessageProperties();
        properties.setExpire(10);
        properties.setTimeUnit(TimeUnit.SECONDS);
        delayMessage.setProperties(properties);
        delayMessageManager.add(delayMessage);


        DelayMessage delayMessage1 = new DelayMessage();
        QrCode qrCode1 = new QrCode();
        qrCode1.setConfigId("fadfdaf1405");
        qrCode1.setUrl("http://www.baidu.com");
        delayMessage1.setBody(JacksonUtils.toJsonString(qrCode1));
        delayMessage1.setType(DelayMessageType.DELETE_QR_CODE);
        DelayMessage.DelayMessageProperties properties1 = new DelayMessage.DelayMessageProperties();
        properties1.setExpire(5);
        properties1.setTimeUnit(TimeUnit.SECONDS);
        delayMessage1.setProperties(properties1);
        delayMessageManager.add(delayMessage1);

        DelayMessage delayMessage2 = new DelayMessage();
        ExecuteTask task = new ExecuteTask();
        task.setId(1L);
        delayMessage2.setBody(JacksonUtils.toJsonString(task));
        delayMessage2.setType(DelayMessageType.EXECUTE_TASK);

        DelayMessage.DelayMessageProperties properties2 = new DelayMessage.DelayMessageProperties();
        properties2.setExpire(9);
        properties2.setTimeUnit(TimeUnit.SECONDS);
        delayMessage2.setProperties(properties2);
        delayMessageManager.add(delayMessage2);

    }
}

控制台打印如下:

2021-09-22 19:27:10.482  INFO 15292 --- [           main] c.c.s.d.manager.DelayMessageManagerTest  : Started DelayMessageManagerTest in 4.747 seconds (JVM running for 5.866)

2021-09-22 19:27:15.702  INFO 3900 --- [ntContainer#0-1] c.c.s.d.m.RabbitmqDelayMessageManager    : 接收到延时消息:DelayMessage(body={"url":"http://www.baidu.com","configId":"fadfdaf1405"}, type=DELETE_QR_CODE, properties=null)
2021-09-22 19:27:15.705  INFO 3900 --- [ntContainer#0-1] c.c.s.d.h.QrCodeDelayMessageHandler      : 二维码延时消息处理中,message = DelayMessage(body={"url":"http://www.baidu.com","configId":"fadfdaf1405"}, type=DELETE_QR_CODE, properties=null)
2021-09-22 19:27:19.696  INFO 3900 --- [ntContainer#0-1] c.c.s.d.m.RabbitmqDelayMessageManager    : 接收到延时消息:DelayMessage(body={"id":1}, type=EXECUTE_TASK, properties=null)
2021-09-22 19:27:19.696  INFO 3900 --- [ntContainer#0-1] c.c.s.d.h.ExecuteTaskDelayMessageHandler : 任务延时消息处理中,message=DelayMessage(body={"id":1}, type=EXECUTE_TASK, properties=null)
2021-09-22 19:27:20.695  INFO 3900 --- [ntContainer#0-1] c.c.s.d.m.RabbitmqDelayMessageManager    : 接收到延时消息:DelayMessage(body={"url":"http://www.baidu.com","configId":"fadfdaf110"}, type=DELETE_QR_CODE, properties=null)
2021-09-22 19:27:20.695  INFO 3900 --- [ntContainer#0-1] c.c.s.d.h.QrCodeDelayMessageHandler      : 二维码延时消息处理中,message = DelayMessage(body={"url":"http://www.baidu.com","configId":"fadfdaf110"}, type=DELETE_QR_CODE, properties=null)

从日志可以看出,消息发送出5s、9s、10s后,成功对消息进行了消费,需要注意的是:设置的延时时间不能大于2^32-1毫秒、秒,否则消息会被立即消费,无法起到延时的效果;如果放入延时队列内的延时时间很长,应该将其放入Mysql中通过定时任务将一定期限内的延时消息让如延时队列内。

总结

本文主要提供了一种基于RabbitMq延时消息插件实现的延时队列,能够同时为多个业务场景按照消息指定延时时间,解决了定时任务调度时间不够准确以及原生RabbitMq不能(能指定但是实际不支持)按照消息自由指定延时时间的问题。
此外,基于Redisson也能够实现延时队列,也能够实现与RabbitMq延时消息插件一样的效果,我在下一篇文章会实现。
本文示例代码链接 https://gitee.com/li-cube/share/tree/master/delay-queue

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,864评论 6 494
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,175评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,401评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,170评论 1 286
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,276评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,364评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,401评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,179评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,604评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,902评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,070评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,751评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,380评论 3 319
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,077评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,312评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,924评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,957评论 2 351

推荐阅读更多精彩内容