执行引擎设计
1:sql表
CREATE TABLE `job_executor_map` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '自增主键',
`enabled` tinyint(3) NOT NULL DEFAULT '1' COMMENT '1 - 可用,2 - 不可用',
`job_type` varchar(32) NOT NULL DEFAULT '' COMMENT '业务类型',
`executor` varchar(64) NOT NULL DEFAULT '' COMMENT '执行器',
`executor_name` varchar(32) NOT NULL DEFAULT '' COMMENT '执行器名称',
`sort` tinyint(3) NOT NULL DEFAULT '0' COMMENT '执行器的执行顺序',
`create_time` datetime NOT NULL DEFAULT '0000-00-00 00:00:00' COMMENT '创建时间',
`modify_time` datetime NOT NULL DEFAULT '0000-00-00 00:00:00' COMMENT '修改时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=21 DEFAULT CHARSET=utf8 COMMENT='业务流程配置';
对应的实体类:JobExecutorMapModel 、 JobExecutorMapDaoImpl、JobExecutorMapDao
2:执行引擎(可支持回滚的)
@Service
public class ExecuteEngine {
// 执行器工厂,可根据job类型,捞出对应的所有顺序执行的执行器
@Autowired
private ExecutorFactory executorFactory;
private Logger logger = LoggerFactory.getLogger(ExecuteEngine.class);
/**
*
* @param request
*/
public ExecuteResult doExecute(ExecuteRequest request) {
// 执行器上下文环境
EngineContext context = new EngineContext();
// 根据job类型,获取执行器列表
List<Executor> list = executorFactory.getExecutorList(request.getJobType());
// 记录已经执行过的执行器,若失败了,可以回滚
Stack<Executor> executorStack = new Stack<>();
for (Executor executor : list) {
executorStack.push(executor);
// 执行执行器的doExecute方法
executor.doExecute(context, request);
// 失败,则回滚
if(context.isError()){
rollBack(executorStack, request);
}
}
ExecuteResult result = new ExecuteResult();
result.setResultObject(context.getData(ContextDataKeyEnum.EXECUTE_RESULT));
return result;
}
public void reload() {
processFactory.reload();
}
}
3:执行器工厂
/**
* 执行器工厂
*/
@Service
public class ExecutorFactory {
private Logger logger = LoggerFactory.getLogger(ExecutorFactory.class);
@Autowired
private JobExecutorMapDao jobExecutorMapDaoImpl;
/**
* executor列表,map的key对应job类型,value对应任务的所有顺序的执行器
*/
private Map<String, List<Executor>> executorMap;
/**
* 按照业务类型查找流程定义
*/
public List<Executor> getExecutorList(JobTypeEnum jobType) {
// 按照业务类型查找流程定义
List<Executor> result = this.executorMap.get(jobType.name());
if (result == null) {
logger.error("业务类型无对应的执行器,jobType=" + jobType.name());
}
return result;
}
public void reload() {
executorMap = null;
load();
}
// 执行工厂加载到bean中,初始化executorMap
@PostConstruct
public synchronized void load() {
if (executorMap == null) {
executorMap = Maps.newConcurrentMap();
}
Map<String, List<JobExecutorMapModel>> executorModelMap = getBizExecutorMap();
for (Map.Entry<String, List<JobExecutorMapModel>> entry : executorModelMap.entrySet()) {
List<JobExecutorMapModel> models = entry.getValue();
// 对执行器进行排序
Collections.sort(models, (BizExecutorMapModel o1, BizExecutorMapModel o2)
-> {
return o1.getSort().intValue() - o2.getSort().intValue();
});
List<Executor> executor = Lists.newArrayList();
for (BizExecutorMapModel model : models) {
// 根据反射,获取对应的执行器的javaBean
executor.add((Executor) SpringBeanUtil.getBean(model.getExecutor()));
}
executorMap.put(entry.getKey(), executor);
}
logger.info("[ExecutorFactory] executorMap:{}", executorMap);
}
private Map<String, List<JobExecutorMapModel>> getBizExecutorMap() {
Map<String, List<JobExecutorMapModel>> executorMap = Maps.newConcurrentMap();
// 从job_executor_map里面,捞出所有的JobExecutorMapModel
List<JobExecutorMapModel> bizExecutorMapModels = jobExecutorMapDaoImpl.queryAllExecutorMap();
// 基于JobType进行分类
for (BizExecutorMapModel bizExecutorMapModel : bizExecutorMapModels) {
List<JobExecutorMapModel> bems = executorMap.get(bizExecutorMapModel.getJobType());
if (CollectionUtils.isEmpty(bems)) {
bems = Lists.newArrayList();
}
bems.add(JobExecutorMapModel);
executorMap.put(bizExecutorMapModel.getJobType(), bems);
}
return executorMap;
}
}
4:执行器接口
public interface Executor {
public void doExecute(EngineContext engineContext, ExecuteRequest request);
5:执行器实现类(可细分为 ExecuteChecker、ExecuteAction两个抽象类)
/**
* 执行一些参数校验逻辑的执行器
**/
public abstract class ExecuteChecker implements Executor {
@Override
public final void doExecute(EngineContext engineContext, ExecuteRequest request) {
doCheck(engineContext, request);
}
/**
* doCheck方法,执行一些参数校验逻辑
*/
public abstract void doCheck(EngineContext engineContext, ExecuteRequest request);
}
/**
* 执行具体的业务逻辑的执行器
**/
public abstract class ExecuteAction implements Executor {
@Override
public final void doExecute(EngineContext engineContext, ExecuteRequest request) {
doAction(engineContext,request);
}
/**
* doAction方法,执行具体的业务逻辑
*/
public abstract void doAction(EngineContext engineContext, ExecuteRequest request);
}
6:具体的执行器实现类
@Service
public class action1 extends ExecuteAction {
private Logger logger = LoggerFactory.getLogger(action1.class);
@Autowired
@Override
public void doAction(EngineContext engineContext, ExecuteRequest request) {
......
}
}
@Service
public class checker1 extends ExecuteChecker {
private Logger logger = LoggerFactory.getLogger(checker1.class);
/**
* @param engineContext
* @param request
*/
@Override
public void doCheck(EngineContext engineContext, ExecuteRequest request) {
// 针对业务类型的参数校验
......
// 特殊参数校验
}
}