序
activiti自己弄了一个job的机制,解决了集群中job的的若干问题:
- 集群中如何保证一个job只有一个机器执行
- job在处理过程中失败了(其他异常或server重启),这个时候如何处理这些failed job,重新跑起来
ACT_RU_JOB表结构
字段名称 | 字段描述 | 数据类型 | 主键 | 为空 | 说明 | |
---|---|---|---|---|---|---|
ID_ | varchar(64) | √ | ||||
REV_ | integer | √ | ||||
TYPE_ | varchar(255) | |||||
LOCK_EXP_TIME_ | 锁定释放时间 | timestamp(3) | √ | |||
LOCK_OWNER_ | 挂起者 | varchar(255) | √ | |||
EXCLUSIVE_ | boolean | √ | ||||
EXECUTION_ID_ | varchar(64) | √ | ||||
PROCESS_INSTANCE_ID_ | varchar(64) | √ | ||||
PROC_DEF_ID_ | varchar(64) | √ | ||||
RETRIES_ | integer | √ | ||||
EXCEPTION_STACK_ID_ | 异常id | varchar(64) | √ | |||
EXCEPTION_MSG_ | 异常信息 | varchar(4000) | √ | |||
DUEDATE_ | 到期时间 | timestamp(3) | √ | |||
REPEAT_ | 重复 | varchar(255) | √ | |||
HANDLER_TYPE_ | 处理类型 | varchar(255) | √ | |||
HANDLER_CFG_ | varchar(4000) | √ | ||||
TENANT_ID_ | varchar(255) | √ |
查询待处理job
SELECT RES.* FROM
ACT_RU_JOB RES
LEFT OUTER JOIN
ACT_RU_EXECUTION PI ON PI.ID_ = RES.PROCESS_INSTANCE_ID_
WHERE (RETRIES_ > 0)
AND (DUEDATE_ is null or DUEDATE_ <= ?)
AND (LOCK_OWNER_ is null or LOCK_EXP_TIME_ <= ?)
AND ( (RES.EXECUTION_ID_ is null) or (PI.SUSPENSION_STATE_ = 1) );
查询条件为
- retries值>0
- duedate为空或者duedate小于当前
- lock_owner为空或者lock_exp_time小于当前
抢占任务
AcquireJobsCmd
protected void lockJob(CommandContext commandContext, JobEntity job, String lockOwner, int lockTimeInMillis) {
job.setLockOwner(lockOwner);
GregorianCalendar gregorianCalendar = new GregorianCalendar();
gregorianCalendar.setTime(commandContext.getProcessEngineConfiguration().getClock().getCurrentTime());
gregorianCalendar.add(Calendar.MILLISECOND, lockTimeInMillis);
job.setLockExpirationTime(gregorianCalendar.getTime());
}
设置lock_owner以及lock_exp_time
lockTimeInMillis比如5 * 60 * 1000,5分钟
job执行成功
private void removeJobs() {
for (Job job: getJobs()) {
((JobEntity) job).delete();
}
}
从job表中删除
job执行失败
if (activity == null || activity.getFailedJobRetryTimeCycleValue() == null) {
log.debug("activitiy or FailedJobRetryTimerCycleValue is null in job " + jobId + "'. only decrementing retries.");
job.setRetries(job.getRetries() - 1);
job.setLockOwner(null);
job.setLockExpirationTime(null);
if (job.getDuedate() == null) {
// add wait time for failed async job
job.setDuedate(calculateDueDate(commandContext, processEngineConfig.getAsyncFailedJobWaitTime(), null));
} else {
// add default wait time for failed job
job.setDuedate(calculateDueDate(commandContext, processEngineConfig.getDefaultFailedJobWaitTime(), job.getDuedate()));
}
}
- retries减1
- 清空lock_owner
- 清空lock_exp_time
- 重新设置due_date,延时重试
/** define the default wait time for a failed async job in seconds */
protected int asyncFailedJobWaitTime = 10;