贼好用,冰河开源了这款精准定时任务和延时队列框架!!

写在前面

在实际工作中,很多小伙伴在开发定时任务时,会采取定时扫描数据表的方式实现。然而,这种方式存在着重大的缺陷:如果数据量大的话,频繁的扫描数据表会对数据库造成巨大的压力;难以支撑大规模的分布式定时任务;难以支持精准的定时任务;大量浪费CPU的资源;扫描的数据大部分是不需要执行的任务。那么,既然定时扫描数据表存在这么多的弊端,那么,有没有一种方式来解决这些问题呢?今天,冰河就带着他的开源项目mykit-delay来了!!开源地址:https://github.com/sunshinelyz/mykit-delay

在使用框架过程中如有任何问题,都可以添加冰河微信【sun_shine_lyz】进行交流。

文章已收录到https://github.com/sunshinelyz/technology-binghe

项目简述

Mykit体系中提供的简单、稳定、可扩展的延迟消息队列框架,提供精准的定时任务和延迟队列处理功能。

项目模块说明

  • mykit-delay-common: mykit-delay 延迟消息队列框架通用工具模块,提供全局通用的工具类
  • mykit-delay-config: mykit-delay 延迟消息队列框架通用配置模块,提供全局配置
  • mykit-delay-queue: mykit-delay 延迟消息队列框架核心实现模块,目前所有主要的功能都在此模块实现
  • mykit-delay-controller: mykit-delay 延迟消息队列框架Restful接口实现模块,对外提供Restful接口访问,兼容各种语言调用
  • mykit-delay-core: mykit-delay 延迟消息队列框架的入口,整个框架的启动程序在此模块实现
  • mykit-delay-test: mykit-delay 延迟消息队列框架通用测试模块,主要提供Junit单元测试用例

需求背景

  • 用户下订单后未支付,30分钟后支付超时
  • 在某个时间点通知用户参加系统活动
  • 业务执行失败之后隔10分钟重试一次

类似的场景比较多 简单的处理方式就是使用定时任务 假如数据比较多的时候 有的数据可能延迟比较严重,而且越来越多的定时业务导致任务调度很繁琐不好管理。

队列设计

整体架构设计如下图所示。

image

开发前需要考虑的问题

  • 及时性 消费端能按时收到
  • 同一时间消息的消费权重
  • 可靠性 消息不能出现没有被消费掉的情况
  • 可恢复 假如有其他情况 导致消息系统不可用了 至少能保证数据可以恢复
  • 可撤回 因为是延迟消息 没有到执行时间的消息支持可以取消消费
  • 高可用 多实例 这里指HA/主备模式并不是多实例同时一起工作
  • 消费端如何消费

当然初步选用redis作为数据缓存的主要原因是因为redis自身支持zset的数据结构(score 延迟时间毫秒) 这样就少了排序的烦恼而且性能还很高,正好我们的需求就是按时间维度去判定执行的顺序 同时也支持map list数据结构。

简单定义一个消息数据结构

private String topic;/***topic**/
private String id;/***自动生成 全局惟一 snowflake**/
private String bizKey;
private long delay;/***延时毫秒数**/
private int priority;//优先级
private long ttl;/**消费端消费的ttl**/
private String body;/***消息体**/
private long createTime=System.currentTimeMillis();
private int status= Status.WaitPut.ordinal();

运行原理:

  • 用Map来存储元数据。id作为key,整个消息结构序列化(json/…)之后作为value,放入元消息池中。
  • 将id放入其中(有N个)一个zset有序列表中,以createTime+delay+priority作为score。修改状态为正在延迟中
  • 使用timer实时监控zset有序列表中top 10的数据 。 如果数据score<=当前时间毫秒就取出来,根据topic重新放入一个新的可消费列表(list)中,在zset中删除已经取出来的数据,并修改状态为待消费
  • 客户端获取数据只需要从可消费队列中获取就可以了。并且状态必须为待消费 运行时间需要<=当前时间的 如果不满足 重新放入zset列表中,修改状态为正在延迟。如果满足修改状态为已消费。或者直接删除元数据。

客户端

因为涉及到不同程序语言的问题,所以当前默认支持http访问方式。

  • 添加延时消息添加成功之后返回消费唯一ID POST /push {…..消息体}
  • 删除延时消息 需要传递消息ID GET /delete?id=
  • 恢复延时消息 GET /reStore?expire=true|false expire是否恢复已过期未执行的消息。
  • 恢复单个延时消息 需要传递消息ID GET /reStore/id
  • 获取消息 需要长连接 GET /get/topic

用nginx暴露服务,配置为轮询 在添加延迟消息的时候就可以流量平均分配。

目前系统中客户端并没有采用HTTP长连接的方式来消费消息,而是采用MQ的方式来消费数据这样客户端就可以不用关心延迟消息队列。只需要在发送MQ的时候拦截一下 如果是延迟消息就用延迟消息系统处理。

消息可恢复

实现恢复的原理 正常情况下一般都是记录日志,比如mysql的binlog等。

这里我们直接采用mysql数据库作为记录日志。

目前创建以下2张表:

  • 消息表 字段包括整个消息体
  • 消息流转表 字段包括消息ID、变更状态、变更时间、zset扫描线程Name、host/ip

定义zset扫描线程Name是为了更清楚的看到消息被分发到具体哪个zset中。前提是zset的key和监控zset的线程名称要有点关系 这里也可以是zset key。

支持消息恢复

假如redis服务器宕机了,重启之后发现数据也没有了。所以这个恢复是很有必要的,只需要从表1也就是消息表中把消息状态不等于已消费的数据全部重新分发到延迟队列中去,然后同步一下状态就可以了。

当然恢复单个任务也可以这么干。

数据表设计
这里,我就直接给出创建数据表的SQL语句。

DROP TABLE IF EXISTS `mykit_delay_queue_job`;
CREATE TABLE `mykit_delay_queue_job` (
  `id` varchar(128) NOT NULL,
  `bizkey` varchar(128) DEFAULT NULL,
  `topic` varchar(128) DEFAULT NULL,
  `subtopic` varchar(250) DEFAULT NULL,
  `delay` bigint(20) DEFAULT NULL,
  `create_time` bigint(20) DEFAULT NULL,
  `body` text,
  `status` int(11) DEFAULT NULL,
  `ttl` int(11) DEFAULT NULL,
  `update_time` datetime(3) DEFAULT NULL,
  PRIMARY KEY (`id`),
  KEY `mykit_delay_queue_job_ID_STATUS` (`id`,`status`),
  KEY `mykit_delay_queue_job_STATUS` (`status`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

-- ----------------------------
-- Table structure for mykit_delay_queue_job_log
-- ----------------------------
DROP TABLE IF EXISTS `mykit_delay_queue_job_log`;
CREATE TABLE `mykit_delay_queue_job_log` (
  `id` varchar(128) NOT NULL,
  `status` int(11) DEFAULT NULL,
  `thread` varchar(60) DEFAULT NULL,
  `update_time` datetime(3) DEFAULT NULL,
  `host` varchar(128) DEFAULT NULL,
  KEY `mykit_delay_queue_job_LOG_ID_STATUS` (`id`,`status`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

关于高可用

分布式协调还是选用zookeeper。

如果有多个实例最多同时只能有1个实例工作 这样就避免了分布式竞争锁带来的坏处,当然如果业务需要多个实例同时工作也是支持的,也就是一个消息最多只能有1个实例处理,可以选用zookeeper或者redis就能实现分布式锁了。

最终做了一下测试多实例同时运行,可能因为会涉及到锁的问题性能有所下降,反而单机效果很好。所以比较推荐基于docker的主备部署模式。

运行模式

  • 支持 master,slave (HA)需要配置mykit.delay.registry.serverList zk集群地址列表
  • 支持 cluster 会涉及到分布式锁竞争 效果不是很明显 分布式锁采用redissetNx实现
  • StandAlone

目前,经过测试,推荐使用master slave的模式,后期会优化Cluster模式

如何接入

为了提供一个统一的精准定时任务和延时队列框架,mykit-delay提供了HTTP Rest接口供其他业务系统调用,接口使用简单方便,只需要简单的调用接口,传递相应的参数即可。

消息体

以JSON数据格式参数 目前只提供了http协议

  • body 业务消息体
  • delay 延时毫秒 距createTime的间隔毫秒数
  • id 任务ID 系统自动生成 任务创建成功返回
  • status 状态 默认不填写
  • topic 标题
  • subtopic 保留字段
  • ttl 保留字段
  • createTime 创建任务时间 非必填 系统默认

添加任务

/push  
    POST application/json
{"body":"{hello world}","delay":10000,"id":"20","status":0,"topic":"ces","subtopic":"",ttl":12}

删除任务

删除任务 需要记录一个JobId

/delete?jobId=xxx
   GET

恢复单个任务

用于任务错乱 脑裂情况 根据日志恢复任务

/reStoreJob?JobId=xxx
   GET

恢复所有未完成的任务

根据日志恢复任务

/reStore?expire=true
   GET

参数expire 表示是否需要恢复已过期还未执行的数据

清空队列数据

根据日志中未完成的数据清空队列中全部数据。清空之后 会删除缓存中的所有任务

/clearAll
 GET

客户端获取队列方式

目前默认实现了RocketMQActiveMQ的推送方式。依赖MQ的方式来实现延时框架与具体业务系统的耦合。

消息体中消息与RocketMQActiveMQ 消息字段对应关系

mykit-delay RocketMQ ActiveMQ 备注
topic topic topic 点对点发送队列名称或者主题名称
subtopic subtopic subtopic 点对点发送队列子名称或者主题子名称
body 消息内容 消息内容 消息内容

关于系统配置

延迟框架与具体执行业务系统的交互方式通过延迟框架配置实现,具体配置文件位置为mykit-delay-config项目下的resources/properties/starter.properties文件中。

测试

需要配置好数据库地址和Redis的地址 如果不是单机模式 也需要配置好Zookeeper

运行mykit-delay-test模块下的测试类io.mykit.delay.test.PushTest添加任务到队列中

启动mykit-delay-test模块下的io.mykit.delay.TestDelayQueue消费前面添加数据 为了方便查询效果 默认的消费方式是consoleCQ 控制台输出

扩展

支持zset队列个数可配置 避免大数据带来高延迟的问题。

近期规划

  • 分区(buck)支持动态设置
  • redis与数据库数据一致性的问题 (重要
  • 实现自己的推拉机制
  • 支持可切换实现方式 目前只是依赖Redis实现,后续待优化
  • 支持Web控制台管理队列
  • 实现消息消费TTL机制

如果这款开源框架对你有帮助,请小伙伴们打开github链接:https://github.com/sunshinelyz/mykit-delay ,给个Star,让更多的小伙伴看到,减轻工作中繁琐的扫描数据表的定时任务开发。也希望能够有越来越多的小伙伴参与这个开源项目,我们一起养肥它!!

image

好了,不早了,今天就到这儿吧,我是冰河,我们下期见!!

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,588评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,456评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,146评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,387评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,481评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,510评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,522评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,296评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,745评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,039评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,202评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,901评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,538评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,165评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,415评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,081评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,085评论 2 352

推荐阅读更多精彩内容