1.启动过程流程图
流程图详细地描述了各个作业细节的执行过程,看上去流程非常复杂,其主要的功能点为:判断作业是否可执行,判断作业是否分片执行,作业执行状态监听,作业失效转移等。下面我们结合代码一步步窥探他的执行过程。
2.核心源码分析
2.1 作业入口
/**
* Elastic Job Lite提供的Quartz封装作业.
*
* @author zhangliang
*/
public class LiteJob implements Job {
@Setter
private ElasticJob elasticJob;
@Override
public void execute(final JobExecutionContext context) throws JobExecutionException {
elasticJob.execute();
}
}
LiteJob实现了Quartz的Job接口,并且持有elasticJob的实现类,通过代理的方式实现了ElasticJob与Quartz的无缝衔接;
【亮点】这是一种典型的代理模式,其好处在于体验上完全与Quartz的Job一致,并且遵循了代码的开闭原则,使得代码具有很好地拓展性:例如ElasticJob接口有SimpleJob,DataFlowJob或者用户自定义的多种实现类,因此具有很好地拓展性。
2.2 AbstractElasticJob抽象类及其原理
/**
* 弹性化分布式作业的基类.
*
* @author zhangliang
* @author caohao
*/
@Slf4j
public abstract class AbstractElasticJob implements ElasticJob {
//具体的业务实现放在jobFacade门面类中实现,简化代码复杂度
private JobFacade jobFacade;
@Override
public final void execute() {
log.trace("Elastic job: job execute begin.");
//判断与注册中心时间差是否在允许范围内
jobFacade.checkMaxTimeDiffSecondsTolerable();
//获取分片上下文
JobExecutionMultipleShardingContext shardingContext = jobFacade.getShardingContext();
//若前面的任务仍在执行,则设置错过执行标记,延迟执行
if (jobFacade.misfireIfNecessary(shardingContext.getShardingItems())) {
log.debug("Elastic job: previous job is still running, new job will start after previous job completed. Misfired job had recorded.");
return;
}
//清除作业上次执行的信息
jobFacade.cleanPreviousExecutionInfo();
try {
//各监听器执行job执行前方法
jobFacade.beforeJobExecuted(shardingContext);
//CHECKSTYLE:OFF
} catch (final Throwable cause) {
//CHECKSTYLE:ON
handleJobExecutionException(new JobException(cause));
}
//执行具体的job业务逻辑
executeJobInternal(shardingContext);
log.trace("Elastic job: execute normal completed, sharding context:{}.", shardingContext);
while (jobFacade.isExecuteMisfired(shardingContext.getShardingItems())) {
log.trace("Elastic job: execute misfired job, sharding context:{}.", shardingContext);
jobFacade.clearMisfire(shardingContext.getShardingItems());
executeJobInternal(shardingContext);
log.trace("Elastic job: misfired job completed, sharding context:{}.", shardingContext);
}
//按需失效转移
jobFacade.failoverIfNecessary();
try {
//执行监听后事件
jobFacade.afterJobExecuted(shardingContext);
//CHECKSTYLE:OFF
} catch (final Throwable cause) {
//CHECKSTYLE:ON
handleJobExecutionException(new JobException(cause));
}
log.trace("Elastic job: execute all completed.");
}
private void executeJobInternal(final JobExecutionMultipleShardingContext shardingContext) {
if (shardingContext.getShardingItems().isEmpty()) {
log.trace("Elastic job: sharding item is empty, job execution context:{}.", shardingContext);
return;
}
//注册任务执行信息
jobFacade.registerJobBegin(shardingContext);
try {
executeJob(shardingContext);
//CHECKSTYLE:OFF
} catch (final Throwable cause) {
//CHECKSTYLE:ON
handleJobExecutionException(new JobException(cause));
} finally {
// TODO 考虑增加作业失败的状态,并且考虑如何处理作业失败的整体回路
jobFacade.registerJobCompleted(shardingContext);
}
}
protected abstract void executeJob(final JobExecutionMultipleShardingContext shardingContext);
@Override
public void handleJobExecutionException(final JobException jobException) {
log.error("Elastic job: exception occur in job processing...", jobException.getCause());
}
@Override
public final JobFacade getJobFacade() {
return jobFacade;
}
@Override
public final void setJobFacade(final JobFacade jobFacade) {
this.jobFacade = jobFacade;
}
【亮点】外观模式传送门
上面的代码中应用到了外观模式(Facade),AbstractElasticJob持有jobFacade对象,Elasticjob负责统筹整体的job执行流程但无需关注业务的具体实现,转而将复杂的业务处理逻辑交由jobFacade中的方法进行处理,从而将job与具体的业务逻辑抽离出来方便阅读和拓展。