tbschedule源码解读
tbschedule部署包括两部分,一个是负责配置管理的后台程序,一个是客户端接入包,这两个程序依赖zk进行信息交互。
zk数据的大致结构
factory部分:
/app1/factory
/app1/factory/facotoryUUID1
/app1/strategy
/app1/strategy/strategy1
/app1/strategy/strategy1/factoryUUID1
可以有多个facotory,每个factory对应一个客户端启动的TBScheduleManagerFactory
实例,每个JVM可以有多个factory实例,factory实例也可以存在于不同的JVM中。
strategy是在后台配置的任务策略,每个factory启动时候回去检查自己能处理哪几个strategy,如果能处理则在/app1/strategy/strategy1/
路径下注册自己,注册的这个信息在tbschedule源码里叫做FactoryRunningInfo
。
ScheduleServer部分:
/app1/baseTaskType
/app1/baseTaskType/task1
/app1/baseTaskType/task1/task1
/app1/baseTaskType/task1/task1/server
/app1/baseTaskType/task1/task1/server/scheduleServerUUID1
/app1/baseTaskType/task1/task1/taskItem
/app1/baseTaskType/task1/task1/taskItem/taskItem1
/app1/baseTaskType/task1/task1/taskItem/taskItem1/cur_server
/app1/baseTaskType/task1/task1/taskItem/taskItem1/deal_desc
/app1/baseTaskType/task1/task1/taskItem/taskItem1/parameter
/app1/baseTaskType/task1/task1/taskItem/taskItem1/req_server
/app1/baseTaskType/task1/task1/taskItem/taskItem1/sts
task1是在后台配置的任务。
/app1/baseTaskType/task1/task1/server/scheduleServerUUID1
表示可以用来处理任务的调度器,每个factory实例可以有多个ScheduleServer实例。
/app1/baseTaskType/task1/task1/taskItem
表示配置任务时,每个任务可以拆分成几个小的任务项。该节点的子节点,表示这个任务项运行时的信息,例如cur_server
表示这个taskItem正在被哪个ScheduleServer处理。这些在tbschedule源码里也叫作runningInfo。
核心类图
TBScheduleManagerFactory
factory实例对象,管理这个factory内部所有的事情。ZKManager
负责与zk之间的连接,数据交换。IScheduleDataManager
负责/app1/baseTaskType
及其子节点所有数据模型维护。ScheduleDataManger4ZK
负责/app1/factory``/app1/strategy
及其字节点数据模型维护。IStrategyTask
每个实例代表一个线程组,每个strategy可对应多个IStrategyTask
实例,来真正处理配置的任务。
关于这几个类的组合关系如下图:
一个Factory处理多个strategy,每个strategy下有多个
IStrategyTask
对象。TBScheduleManager
实现IStrategyTask
接口,一个TBScheduleManager
实例跟ScheduleServer
、ScheduleProcessor
、IScheduleTaskDeal
的关系都是一比一的关系。ScheduleServer
是针对某一个task的的调度器。IScheduleTaskDeal
是我们自己代码里需要实现的任务对象。ScheduleProcessor
是处理任务的多线程任务处理器,代表一个线程组。可以包含多个线程,线程的最大数量取决于后台配置的task身上的threadNum
字段。一个Factory有多个
IStrategyTask
的原因是,任务需要分片处理,每个分片对应一个IStrategyTask
实例。一个
ScheduleProcessor
有多个Thread的原因是,一个任务分片下一次可以取出多个任务,开启多线程可以并发处理这些任务。
初始化流程
整个初始化过程大量使用Thread、Timer,很多工作都是异步进行的,且这些线程之间通过了状态对象、锁等方式进行了协调。
整个初始化过程粗略来看包括以下几步:
- 创建ZKManager对象
- 启动初始化线程
InitalThread
,然后立即返回
接着便是InitalThread
异步做的初始化工作: - 准备好ZKManager、ScheduleDataManager4ZK、ScheduleStrategyDataManager4ZK对象
- 启动定时Timer对象
ManagerFactoryTimerTask
接着便是ManagerFactoryTimerTask
定时执行的工作,主要是去扫描strategy配置,重新分配factory去处理这些strategy。分配完factory,会创建StrategyTask进行任务的处理。
factory刷新工作详解
整个过程源码入口在ManagerFactoryTimerTask#run()
中,而主要的逻辑集中在TBScheduleManagerFactory#refresh()
。这里不去关心stop factory的逆向流程,只来看正向流程,见TBScheduleManagerFactory #reRegisterManagerFactory
。
- 遍历strategy,重新计算factory实例跟strategy的匹配关系
- 找到当前factory实例不能处理的strategy,并停止掉正在运行的
StrategyTask
- 遍历跟当前factory实例相关的strategy,选举出每个strategy的leader factory实例,由leader重新计算每个factory实例能够分到的reqNum,即根据strategy身上的
assignNum``numOfSingleServer
,将assignNum
平分给每个factory实例。 - 调整当前factory实例分配到每个strategy的的StrategyTask的数量,确保数量等于上一步分配给自己的数量。
factory线程组数量分配算法
见ScheduleUtil#assignTaskNumber
/**
* 分配任务数量
* @param serverNum 总的服务器数量
* @param taskItemNum 任务项数量
* @param maxNumOfOneServer 每个server最大任务项数目
* @return
*/
public static int[] assignTaskNumber(int serverNum,int taskItemNum,int maxNumOfOneServer){
int[] taskNums = new int[serverNum];
int numOfSingle = taskItemNum / serverNum;
int otherNum = taskItemNum % serverNum;
//20150323 删除, 任务分片保证分配到所有的线程组数上。 开始
// if (maxNumOfOneServer >0 && numOfSingle >= maxNumOfOneServer) {
// numOfSingle = maxNumOfOneServer;
// otherNum = 0;
// }
//20150323 删除, 任务分片保证分配到所有的线程组数上。 结束
for (int i = 0; i < taskNums.length; i++) {
if (i < otherNum) {
taskNums[i] = numOfSingle + 1;
} else {
taskNums[i] = numOfSingle;
}
}
return taskNums;
}
TBScheduleManagerStatic的初始化流程
- 找到task配置的用户实现的
IScheduleTaskDeal
对象 - 将当前ScheduleServer实例注册到
/app1/baseTaskType/task1/task1/server/scheduleServerUUID1
位置 - 启动心跳Timer
HearBeatTimerTask
- 启动初始化线程
心跳Timer
这里主要做的事情就是重新将taskItem分配到每个SchedueServer,源码位置在TBScheduleManagerStatic#assignScheduleTask()
。首先选举出当前ScheduleServer对应的task对应的所有ScheduleServer实例,选举出一个leader,由leader进行分配工作。
- 等到初始化线程完成initialRunningInfo的工作
- clearTaskItem,遍历所有taskItem,查看对应的cur_server是否还能找到,找不到则将cur_server置为null
- assignTaskItem,给每个taskItem分配合适的ScheduleServer实例。
初始化线程
- initialRunningInfo,由当前task的leader ScheduleServer实例初始化这个task下所有的taskItem子节点的数据,此时还没有分配每个taskItem由哪个ScheduleServer实例执行(见心跳Timer)
- getCurrentScheduleTaskItemListNow,重新加载当前ScheduleServer能处理的taskItem项目
- computerStart,创建两个Timer,一个用来计算任务下次执行开始时间,一个用来计算任务下次终止执行时间。停止跟恢复通过
TBScheduleManager
身上的isPauseSchedule
字段来标识。 - 恢复的时候去创建
TBScheduleProcessorSleep``TBScheduleProcessorNotSleep
对象;停止的时候,会将已经在执行的任务处理完,但是缓存在队列中待执行的任务将被丢弃。
TBScheduleProcessorSleep多线程工作原理
启动task配置的threadNum数量的线程去处理任务。由其中某一个线程去获取任务,将入taskList队列中,所有的线程从这个队列中获取任务执行,如果是Multi任务,可以一次取多个任务执行。在一个线程获取任务的过程中,其他线程处于休眠状态,任务获取完毕唤醒其他线程。获取任务代码在TBScheduleProcessorSleep#loadScheduleData
,每次获取都是调用一次IScheduleTaskDeal
对象selectTasks
方法获取一批任务放到taskList中。
两次loadScheduleData
有一个休眠时间,即在task上配置的SleepTimeInterval。
一旦TBScheduleProcessorSleep启动了,会一直循环执行,知道PauseTimer让其停止,如果你没有配置结束时间,则不会停止,而是一直运行;也可以通过后台配置将任务停止。
总结
tbschdule通过任务分片,将一个任务分配给多个线程组(即ScheduleServer实例)执行,这些线程组可以分布在相同或者不同的JVM上。而每个线程组支持多线程处理某一个分片的任务。
tbschedule同时支持失效任务转移功能,并且可以通过管理后台对任务进行调度管理。
不过官方文档实在太少。
参考:
tbschedule
关于TbSchedule任务调度管理框架的整合部署