spring原生定时任务框架的缺陷
- 不支持分片任务:处理有序数据时,多机器分片执行不同任务,集群部署执行存在任务重复执行的问题
- 不支持任务生命周期统一管理:不重启服务的情况下启停任务,修改任务参数
- 不支持失败重试:出现异常后任务终止,不能根据任务状态控制任务重新执行
- 无报警机制:任务执行失败报警
- 任务数据难以统计:任务量大时,无法统计任务的执行情况
与elastic-job的区别
elastic-job设计初衷是应对高并发调度场景,是依赖于zk的,通过zk选举机制选举出一个主服务器。是无中心化的分布式调度框架,可扩展性和可用性强,但使用和维护更复杂。
xxl-job相反,它是通过一个调度化的调度平台,调度多个执行器执行任务。调度中心还集成了一个任务管理界面,轻量级,与springboot集成较好,使用方便,维护成本较低,还有失败的邮件告警。基于DB锁保证分布式调度的一致性,执行器过多会对数据库造成较大压力,但一般场景下,执行器并不多,任务执行并不频繁。
总体架构
概要设计
调度中心
JobScheduleHelper
任务的调度通过调度线程(scheduleThread)和时间轮线程(ringThread)完成。
调度线程
- 小于当前时间并且大于当前时间-5s的,是短时间过期的任务,直接调度执行器执行
- 小于当前时间+5s并且大于当前时间的,是马上要执行的任务,放入时间轮,交给时间轮线程处理
- 小于当前时间-5s的,是过期时间比较长的,根据任务配置的misfire策略,忽略调度或者补偿调度
//thread loop间隔时间:不间隔、1s以内或5s以内
// 如果时间大于1s,立刻触发下一次调度,防止错过任务的执行。
if (cost < 1000) { // 如果时间小于1s,休眠一段时间
try {
//preReadSuc==true,为1s减去System.currentTimeMillis()%1000)
//如果preReadSuc==false,说明将来5s内都不会有任务执行,为5s去减
//System.currentTimeMillis()%1000是当前秒已经经过的毫秒数,减去它表示当前秒剩余的毫秒数,这样可以确保线程在整数秒唤醒
TimeUnit.MILLISECONDS.sleep((preReadSuc?1000:PRE_READ_MS) - System.currentTimeMillis()%1000);
} catch (InterruptedException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
}
时间轮线程
//thread loop间隔时间1s以内
TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis() % 1000);
for (int i = 0; i < 2; i++) {
//如果nowSecond=0,那么上一秒就是59
List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );
if (tmpData != null) {
ringItemData.addAll(tmpData);
}
}
JobRegistryHelper
- 从xxl_job_group查询自动注册的执行器
- 查询失联的执行器,即90s内无心跳的,并将它们从注册表xxl_job_registry移除
- 查询在线的执行器,即90s内有心跳的,并将它们的IP地址和端口号信息更新到执行器表xxl_job_group
//thread loop 30s间隔时间
TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
路由策略
一致性hash
普通hash算法的问题:分布式系统中,机器扩容或缩容时会导致大面积哈希映射失效。而一致性哈希在机器扩容或缩容时,只迁移顺时针方向相邻的2台机器的数据。
算法描述:散列地址空间是长度为2^32的哈希环,机器地址映射到哈希环上,缓存key的哈希值在哈希环位置往顺时针方向的第一个机器地址,为缓存映射的机器地址。
存在的问题:数据倾斜,节点雪崩。当机器数很少时,映射地址过于聚集,导致大部分数据映射到其中一台机器,造成数据倾斜。其中一台机器数据过多压力过大而宕机后,数据迁移到集群另一台机器,使得另一台宕机,连锁反应最终导致整个集群宕机的现象,叫节点雪崩。
解决方案:使用虚拟节点,每台机器对应多个虚拟节点,映射到虚拟节点的数据对应真实物理机器,虚拟节点均匀分布在哈希环上
public String hashJob(int jobId, List<String> addressList) {
//hash环用红黑树实现,可以很方便的获取以指定key为起始的子map
//红黑树可以看成有序的链表,
TreeMap<Long, String> addressRing = new TreeMap<Long, String>();
for (String address: addressList) {
for (int i = 0; i < VIRTUAL_NODE_NUM; i++) {
long addressHash = hash("SHARD-" + address + "-NODE-" + i);
addressRing.put(addressHash, address);
}
}
long jobHash = hash(String.valueOf(jobId));
SortedMap<Long, String> lastRing = addressRing.tailMap(jobHash);
if (!lastRing.isEmpty()) {
return lastRing.get(lastRing.firstKey());
}
return addressRing.firstEntry().getValue();
}