Zeus-Master-主要组件

MasterContext:

Master上下文

private Map<Channel, MasterWorkerHolder> workers=new ConcurrentHashMap<Channel, MasterWorkerHolder>();
private ApplicationContext applicationContext;
private Master master;
private Scheduler scheduler;
private Dispatcher dispatcher;//调度任务 jobId
private Queue<JobElement> queue=new ArrayBlockingQueue<JobElement>(10000);//调试任务  debugId
private Queue<JobElement> debugQueue=new ArrayBlockingQueue<JobElement>(1000);//手动任务  historyId
private Queue<JobElement> manualQueue=new ArrayBlockingQueue<JobElement>(1000);
private MasterHandler handler;
private MasterServer server;
private ExecutorService threadPool=Executors.newCachedThreadPool();
private ScheduledExecutorService schedulePool=Executors.newScheduledThreadPool(5);

内容说明:

  • workers: 维护了与所有worker的rpc通信管道,同时包含了worker的心跳信息,(内存、正在跑的任务、心跳时间戳、worker host标识)
  • applicationContext:spring 管理数据库对象;
  • master: master中初始化了对Dispatcher的监听,同时维护了两个线程,一个线程将任务池中的任务分配给当前最合适的worker机器,并启动;另个线程用了监测与worker的心跳连接,对于超时一分钟的则进行关闭;
  • scheduler: 开源框架quartz的调度api,对于周期性的任务调度,zeus使用quarz的调度策略来进行调度;
  • dispatcher: 事件分发类;
  • queue: 正常调度的任务池;
  • debugQueue: 调试任务的任务池;
  • manualQueue: 手动任务的任务池;
  • handler: master与web和worker的rpc通信封装类;
  • server: 主节点 master netty服务器启动;
  • threadPool: 主节点rpc通信线程池;
  • schedulePool: 主节点调度线程池;

JobController:

任务控制器:

private final String jobId;
private CacheJobDescriptor cache;
private JobHistoryManager jobHistoryManager;
private GroupManager groupManager;
private Master master;
private MasterContext context;

任务状态的控制器,每个任务对应一个 JobController;通过接受不同的事件,来转换任务的状态机状态,从而到达管理任务生命周期的作用;

Zeus 中设定的任务的触发方式分为三种:

/** * 触发任务执行的3种类型
 * 1: 定时执行
 * 2:手动执行(不产生连锁反应)
 * 3:手动恢复(产生连锁反应)
 *
 */
public enum TriggerType{
   SCHEDULE(1),MANUAL(2),MANUAL_RECOVER(3);
   public String toName(){
   if(id==1){
      return "自动调度";
   }else if(id==2){
      return "手动触发";
   }else if(id==3){
      return "手动恢复";
   }
   return "未知";
}

Zeus中设定的任务状态:

/**
 * WAIT:
 *     Job没有开始,或者Job依赖的任务没有全部完成
 * RUNNING
 *     Job正在运行中
 * SUCCESS
 *     Job运行成功(瞬间状态)
 * FAILED
 *     Job运行失败(瞬间状态)
 * @author zhoufang
 *
 */
public enum Status{
   WAIT("wait"),RUNNING("running"),SUCCESS("success"),FAILED("failed");

Zeus中设定任务的大的类型:

public enum JobRunType {
   MapReduce("main"), Shell("shell"), Hive("hive");

Zeus中设定的任务调度的类型:

public enum JobScheduleType {
   Independent(0), Dependent(1), CyleJob(2);

如果job控制器的任务调度类型是Independent,表示这个任务是独立的任务,仅仅依靠调度周期来触发,不会由其他的任务调度起来。而Dependent类型的则是说任务依赖于上层任务的触发。cycle 任务分为天任务和小时任务,

public void handleEvent(AppEvent event) {
   try {
      if (event instanceof JobSuccessEvent) {
         successEventHandle((JobSuccessEvent) event);
      } else if (event instanceof JobFailedEvent) {
         failedEventHandle((JobFailedEvent) event);
      } else if (event instanceof ScheduleTriggerEvent) {
         triggerEventHandle((ScheduleTriggerEvent) event);
      } else if (event instanceof JobMaintenanceEvent) {
         maintenanceEventHandle((JobMaintenanceEvent) event);
      } else if (event.getType() == Events.Initialize) {
         initializeEventHandle();
      }
   } catch (Exception e) {
      // catch所有的异常,保证本job的异常不影响其他job的运行
      ScheduleInfoLog.error("JobId:" + jobId + " handleEvent error", e);
   }
}

处理 JobSuccessEvent 事件:

private void successEventHandle(JobSuccessEvent event) {
   if (event.getTriggerType() == TriggerType.MANUAL) {
      return;
   }
   String eId = event.getJobId();
   JobDescriptor jobDescriptor = cache.getJobDescriptor();
   if (jobDescriptor == null) {
      autofix();
      return;
   }
   if (!jobDescriptor.getAuto()) {
      return;
   }
   if (jobDescriptor.getScheduleType() == JobScheduleType.Independent) {
      return;
   }
   if (jobDescriptor.getScheduleType() == JobScheduleType.CyleJob) {
      cycleJobSuccessHandle(event);
      return;
   }
   if (!jobDescriptor.getDependencies().contains(eId)) {
      return;
   }
   JobStatus jobStatus = null;
   synchronized (this) {
      jobStatus = groupManager.getJobStatus(jobId);
      JobBean bean = groupManager.getUpstreamJobBean(jobId);
      String cycle = bean.getHierarchyProperties().getProperty(
            PropertyKeys.DEPENDENCY_CYCLE);
      if (cycle != null && !"".equals(cycle)) {
         Map<String, String> dep = jobStatus.getReadyDependency();
         if ("sameday".equals(cycle)) {
            SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd");
            String now = format.format(new Date());
            for (String key : new HashSet<String>(dep.keySet())) {
               String d = format.format(new Date(Long.valueOf(dep
                     .get(key))));
               if (!now.equals(d)) {
                  jobStatus.getReadyDependency().remove(key);
                  ScheduleInfoLog.info("JobId:" + jobId
                        + " remove overdue dependency " + key);
               }
            }
         }
      }
      ScheduleInfoLog.info("JobId:" + jobId
            + " received a successed dependency job with jobId:"
            + event.getJobId());
      ScheduleInfoLog.info("JobId:" + jobId + " the dependency jobId:"
            + event.getJobId() + " record it");
      jobStatus.getReadyDependency().put(eId,
            String.valueOf(new Date().getTime()));
      groupManager.updateJobStatus(jobStatus);
   }
   boolean allComplete = true;
   for (String key : jobDescriptor.getDependencies()) {
      if (jobStatus.getReadyDependency().get(key) == null) {
         allComplete = false;
         break;
      }
   }
   if (allComplete) {
      ScheduleInfoLog.info("JobId:" + jobId
            + " all dependency jobs is ready,run!");
      startNewJob(event.getTriggerType(), jobDescriptor, jobId);
   } else {
      ScheduleInfoLog.info("JobId:" + jobId
            + " some of dependency is not ready,waiting!");
   }
}

成功事件的触发是为了连锁效应对下游任务的处理,所以不处理触发类型是MANUAL的事件。cycle 任务收到一个任务的成功事件后,如果发现是自身任务的成功事件,并且自身没有任何的依赖,表示 这个 cycle 任务是一个独立的周期任务,需要运算下次开始时间。如果当前任务依赖于已经完成的任务,则在当前任务中保存已经完成的依赖,并检查当前任务是否可以执行。【该处的代码jobStatus.getReadyDependency().put(eId, event.getStatisEndTime()); 需要推敲一下】。任务依赖的周期都必须相同并且任务的结束时间也必须相同。如果周期不一样,因为只有天任务依赖小时任务,没有小时任务依赖天,所以可以判断自己肯定是天任务,而完成的是小时任务,因此需要判断该小时是否是23时即可。

处理 JobFailedEvent 事件:

private void failedEventHandle(JobFailedEvent event) {
   JobDescriptor jobDescriptor = cache.getJobDescriptor();
   if (jobDescriptor == null) {
      autofix();
      return;
   }
   if (!jobDescriptor.getAuto()) {
      return;
   }
   if (jobDescriptor.getDependencies().contains(event.getJobId())) {// 本Job依赖失败的Job
      if (event.getTriggerType() == TriggerType.SCHEDULE) {// 依赖的Job
                                                // 的失败类型是
                                                // SCHEDULE类型
         // 自身依赖的Job失败了,表明自身也无法继续执行,抛出失败的消息
         ZeusJobException exception = new ZeusJobException(event
               .getJobException().getCauseJobId(), "jobId:"
               + jobDescriptor.getId() + " 失败,原因是依赖的Job:"
               + event.getJobId() + " 执行失败", event.getJobException());
         ScheduleInfoLog.info("jobId:" + jobId
               + " is fail,as dependendy jobId:"
               + jobDescriptor.getId() + " is failed");
         // 记录进History日志
         JobHistory history = new JobHistory();
         history.setStartTime(new Date());
         history.setEndTime(new Date());
         history.setExecuteHost(null);
         history.setJobId(jobId);
         history.setTriggerType(event.getTriggerType());
         history.setStatus(Status.FAILED);
         history.getLog().appendZeusException(exception);
         history.setStatisEndTime(jobDescriptor.getStatisEndTime());
         history.setTimezone(jobDescriptor.getTimezone());
         history.setCycle(jobDescriptor.getCycle());
         history = jobHistoryManager.addJobHistory(history);
         jobHistoryManager.updateJobHistoryLog(history.getId(), history               .getLog().getContent());
         JobFailedEvent jfe = new JobFailedEvent(jobDescriptor.getId(),               event.getTriggerType(), history, exception);
         ScheduleInfoLog.info("JobId:" + jobId
               + " is fail,dispatch the fail event");
         // 广播消息
         context.getDispatcher().forwardEvent(jfe);
      }
   }
}

当依赖的一个Job失败时,本Job也自动失败了。

处理 ScheduleTriggerEvent 事件:

/**
 * 收到定时触发任务的事件的处理流程
 *
  * @param event */
private void triggerEventHandle(ScheduleTriggerEvent event) {
   String eId = event.getJobId();
   JobDescriptor jobDescriptor = cache.getJobDescriptor();
   if (jobDescriptor == null) {// 说明job被删除了,这是一个异常状况,autofix
      autofix();
      return;
   }   if (!eId.equals(jobDescriptor.getId())) {
      return;
   }
   ScheduleInfoLog.info("JobId:" + jobId
         + " receive a timer trigger event,statisTime is:"
         + jobDescriptor.getStatisEndTime());
   runJob(jobDescriptor);
}

收到触发任务事件,如果触发的任务是本任务的话,则启动该任务;

处理 JobMaintenanceEvent 事件:

private void maintenanceEventHandle(JobMaintenanceEvent event) {
   if (event.getType() == Events.UpdateJob
         && jobId.equals(event.getJobId())) {
      autofix();
   }
}

收到更新任务事件,如果触发的任务是本任务的话,则更新改任务的内存信息;

处理 Initialize 事件:

private void initializeEventHandle() {
   JobStatus jobStatus = groupManager.getJobStatus(jobId);
   if (jobStatus != null) {
      // 启动时发现在RUNNING 状态,说明上一次运行的结果丢失,将立即进行重试
      if (jobStatus.getStatus() == Status.RUNNING) {
         log.error("jobId=" + jobId
               + " 处于RUNNING状态,说明该JOB状态丢失,立即进行重试操作...");
         // 搜索上一次运行的日志,从日志中提取jobid 进行kill
         String operator = null;
         if (jobStatus.getHistoryId() != null) {
            JobHistory history = jobHistoryManager
                  .findJobHistory(jobStatus.getHistoryId());
            // 特殊情况下,有可能history查询为空
            if (history != null
                  && history.getStatus() == Status.RUNNING) {
               operator = history.getOperator();
               try {
                  JobContext temp = JobContext.getTempJobContext();
                  temp.setJobHistory(history);
                  new CancelHadoopJob(temp).run();
               } catch (Exception e) {
                  // 忽略
               }
            }
         }
         JobHistory history = new JobHistory();
         history.setIllustrate("启动服务器发现正在running状态,判断状态已经丢失,进行重试操作");
         history.setOperator(operator);
         history.setTriggerType(TriggerType.MANUAL_RECOVER);
         history.setJobId(jobId);
         context.getJobHistoryManager().addJobHistory(history);
         master.run(history);
      }
   }
   JobDescriptor jd = cache.getJobDescriptor();
   // 如果是定时任务,启动定时程序
   if (jd.getAuto() && jd.getScheduleType() == JobScheduleType.Independent) {
      String cronExpression = jd.getCronExpression();
      try {
         CronTrigger trigger = new CronTrigger(jd.getId(), "zeus",               cronExpression);
         JobDetail detail = new JobDetail(jd.getId(), "zeus",               TimerJob.class);
         detail.getJobDataMap().put("jobId", jd.getId());
         detail.getJobDataMap().put("dispatcher",               context.getDispatcher());
         context.getScheduler().scheduleJob(detail, trigger);
      } catch (Exception e) {
         if (e instanceof SchedulerException
               && "Based on configured schedule, the given trigger will never fire."
                     .equals(e.getMessage())) {
            // 定时器已经不会被触发了,关闭该job的自动调度功能
            jd.setAuto(false);
            try {
               groupManager.updateJob(jd.getOwner(), jd);
            } catch (ZeusException e1) {
               log.error("JobId:" + jobId + " 更新失败", e1);
            }
            cache.refresh();
         } else {
            log.error("JobId:" + jobId + " 定时程序启动失败", e);
         }
      }
   }   // 周期任务,并且没有依赖的情况下,直接根据开始时间执行
   if (jd.getAuto()
         && jd.getScheduleType() == JobScheduleType.CyleJob
         && (jd.getDependencies() == null || jd.getDependencies()
               .isEmpty())) {
      initCycleJob(jd);
   }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容