使用Redis实现一个可以指定时间发送消息的延时消息队列

版权申明

原创文章:本博所有原创文章,欢迎转载,转载请注明出处,并联系本人取得授权。
版权邮箱地址:banquan@mrdwy.com

使用场景

因为公司业务的原因,需要实现一个可以在指定时间执行一些任务的功能,比如订单发货通知,过期未付款订单删除,或者流程到期剩余24小时提醒等场景,需要支持由客户端发送一个任务,在指定才执行任务,并且允许客户端回收任务。

刚开始想到可以使用JDK的Timer、ScheduledExecutorService、或者调度框架Quartz等,使用定时器来执行,但是这就存在一个任务执行的实时性不够高的问题,不符合业务需求,另外由于业务量比较大,采用定时调度需要耗费巨大的资源来执行调度任务,比如定时器15分钟执行一次,那么15分钟内可能产生需要执行的任务太多了,调度服务执行不完,导致下次定时轮询到来时上一次轮询还没有结束。或者在不繁忙的时间,定时任务执行扫描任务库发现没有任务可以执行,这样会造成很多无意义的操作,无形中增加了数据库的压力。而且随着业务量的增长,这些情况会越来越明显,显然这个方案是行不通的。

那么就想到只有使用延时消息队列了,这个看上去比前面一个方法就要靠谱多了。

延时消息

需求点:
1、允许发送延时消息,可以支持延时多少时间发送,也可以指定具体的时间发送;
2、客户端可以通过消息的主键删除尚未发送成功消息;
3、需要支持消息消费者分布式部署,支持多个消费者同时消费消息;
4、支持消息的大量堆积,业务繁忙时允许消息发送适量延迟,但是必须保障不能丢消息;

然后这里我进行了一些成熟产品的选型,发现都无法完全满足上面的需求:
直接使用JDK自带的DelayQueue类,显然这个只支持单机运行,并不满足分布式消费,并且不能大量堆积消息,所有的消息都保存在计算机内存中,因此该方案否决。
使用市面上的成熟消息队列产品,主要有ActivitMq、RabbitMq、RocketMq、Kafka等产品,这些产品都能很好的满足延时消费和分布式的需求,但是都不支持回收消息功能,因此最后决定自行开发一个适合公司业务场景使用的延时消息队列。

实现方式

由于公司大量使用了Redis作为缓存数据库,因此相对来说使用起来比较方便,所以就想到了使用Redis的有序集合来实现延时消息队列,主要思路是将消费时间转换成时间戳,然后作为排序分值保存到有序队列中,每个队列代表一种业务场景,消费者循环从有序队列中获取最上一条数据,然后将分值与当前时间进行比较,如果大于当前时间,则执行消费动作,否则等待一段时间。
对于消息回收功能,则只需要将消息ID作为Redis Value值与并且业务ID关联保存起来,然后要回收消息的时候通过Redis API直接删除相应的Value就行了。

Redis有序集合数据结构

查找方式:首先通过业务类型,查到Redis的Key,然后通过Msgid找到具体哪条消息,最后删除消息。

主要实现代码

/**
客户端使用接口代码
**/
import java.util.Date;
import java.util.List;

/**
 * @author tcrow.luo
 * @date 2019/4/22.
 * 延时消息服务类
 */
public interface SysDelayQueueService {

    /**
     * 注册服务,会自动启动一个QueueWorker
     *
     * @param consumer
     */
    void register(Consumer consumer);

    /**
     * 注销服务(暂不支持注销)
     *
     * @param consumer
     */
    void unregister(Consumer consumer);

    /**
     * 暂停程序,关闭程序时调用关闭功能安全关闭
     */
    void shutdown();

    /**
     * 系统初始化时将队列初始化到redis队列中
     */
    void init();

    /**
     * 发送消息
     *
     * @param tag
     * @param keyword  关键词,可以用作查询消息,必须唯一,例如可以使用订单编号作为关键词
     * @param reqParam
     * @param execTime 执行事件
     */
    void send(String tag, String keyword, String reqParam, Date execTime);

    /**
     * 回收消息
     *
     * @param msgId
     */
    void recover(Integer msgId);

    /**
     * 回收消息
     *
     * @param tag
     * @param keyword
     */
    void recover(String tag, String keyword);

    /**
     * 通过关键词查找消息
     *
     * @param tag
     * @param keyword
     * @return
     */
    SysDelayQueue findByKeyword(String tag, String keyword);

}

/**
 * @author tcrow.luo
 * @date 2019/4/22.
 * 定义消息消费者的模型
 */
public interface Consumer {

    /**
     * 消费消息
     *
     * @param reqParam
     */
    void consume(String reqParam);

    /**
     * 获取订阅TAG消息,用于系统启动时自动将消费者注册到注册中心订阅对应TAG的消息
     *
     * @return
     */
    String getTag();

}

import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.ZSetOperations;
import com.alibaba.fastjson.JSONObject;
import java.util.Set;
//..........省略自定义类

/**
 * @author tcrow.luo
 * @date 2019/4/22.
 * 消费者工作类,系统启动时会自动启动对应消费者的工作线程
 */
@Slf4j
public class QueueWorker implements Runnable {

    private Consumer consumer;
    private RedisClient redis;
    private SysDelayQueueMapper sysDelayQueueMapper;
    private volatile boolean shutdown;

    public final static String QUEUE_WORKER = "QUEUE_WORKER";

    public QueueWorker(Consumer consumer) {
        this.consumer = consumer;
        this.redis = SpringContext.getApplicationContext().getBean(RedisClient.class);
        this.sysDelayQueueMapper = SpringContext.getApplicationContext().getBean(SysDelayQueueMapper.class);
        log.info("init [{}] queue worker success ....", consumer.getTag());
        shutdown = false;
    }

    public void shutdown() {
        this.shutdown = true;
    }

    @Override
    public void run() {
        String uuid;
        boolean lock;
        Set<ZSetOperations.TypedTuple<Object>> tuples;
        ZSetOperations.TypedTuple tuple;
        long now;
        String msgId;
        log.info("start [{}] queue loop ...", consumer.getTag());
        while (true) {
            //try{}catch{}防止线程因为意外错误而终止
            if (shutdown) {
                break;
            }
            try {
                now = System.currentTimeMillis() / 1000;

                tuples = redis.zrangeWithScores(consumer.getTag(), 0, 0);
                if (tuples == null || tuples.size() == 0) {
                    Threads.sleep(3000);
                    continue;
                }
                tuple = (ZSetOperations.TypedTuple) tuples.toArray()[0];
                uuid = UUIDUtil.getKey();
                if (now < tuple.getScore().longValue()) {
                    Threads.sleep(500);
                    continue;
                }
                msgId = (String) tuple.getValue();
                //只对消息本身加锁,允许多个线程订阅
                lock = redis.lock(QUEUE_WORKER + msgId, uuid, 3);
                if (!lock) {
                    Threads.sleep(500);
                    continue;
                }
                try {
                    SysDelayQueue sysDelayQueue = sysDelayQueueMapper.selectById(Integer.valueOf(msgId));
                    if (sysDelayQueue == null) {
                        log.error("数据异常,找不到对应的延迟消息,可能数据被异常删除,消息ID:[{}],消息类型[{}]", msgId, consumer.getTag());
                        redis.zrem(consumer.getTag(), tuple.getValue());
                        continue;
                    }
                    try {
                        consumer.consume(sysDelayQueue.getReqParam());
                    } catch (Exception e) {
                        log.error("完成延迟消息的消费,但是发生错误,消息体:[" + JSONObject.toJSONString(sysDelayQueue) + "]", e);
                    } finally {
                        //无论是否消费成功,都需要将消息设置为已消费,否则会造成消费者停止的问题
                        redis.zrem(consumer.getTag(), tuple.getValue());
                        sysDelayQueue.setMsgStatus(Const.Y);
                        sysDelayQueueMapper.updateById(sysDelayQueue);
                    }
                    log.info("完成延迟消息的消费,消息体:[{}]", JSONObject.toJSONString(sysDelayQueue));
                } catch (Exception e) {
                    log.error(e.getMessage(), e);
                } finally {
                    redis.unlock(consumer.getTag(), uuid);
                }
            } catch (Exception e) {
                log.error(e.getMessage(), e);
                Threads.sleep(5000);
            }

        }
    }
}


/**
 * @author tcrow.luo
 * @date 2019/4/22.
 * 消息消费者初始化类,通过Spring的getBeansOfType找到所有实现Consumer接口的Bean,然后将bean通过延时队列的Service方法注册成消费者
 */
@Slf4j
@Component
public class SysDelayQueueInit implements CommandLineRunner {

    @Autowired
    private SysDelayQueueService sysDelayQueueService;

    @Override
    public void run(String... args) {

        sysDelayQueueService.init();

        Map<String, Consumer> beansOfType = SpringContext.getApplicationContext().getBeansOfType(Consumer.class);

        Set<Map.Entry<String, Consumer>> entries = beansOfType.entrySet();
        for (Map.Entry<String, Consumer> entry : entries) {
            Consumer consumer = entry.getValue();
            sysDelayQueueService.register(consumer);
        }

    }

}

这里因为业务原因没有给出SysDelayQueueService接口的实现,自己实现也很简单,基本上send方法就是把消息ID保存到redis有序队列中,而recover则是从有序队列中删除对应的数据,需要注意的是,我这边把消息的请求参数保存在了其它关系型数据库中,没有保存到Redis里面,根据业务场景也可以直接把请求参数另外保存到Redis中,作为字符串保存,Key则直接设置成msgid就行了,这样都使用Redis效率更加高。

使用方式

1、首先实现Consumer 接口,一类业务场景实现一个Consumer接口,则在系统启动时会被自动注册成为消费者;
2、消费场景直接使用SysDelayQueueService.send方法发送消息

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,284评论 6 506
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,115评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,614评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,671评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,699评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,562评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,309评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,223评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,668评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,859评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,981评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,705评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,310评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,904评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,023评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,146评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,933评论 2 355

推荐阅读更多精彩内容