Zeus-Master-主要组件

MasterContext:

Master上下文

private Map<Channel, MasterWorkerHolder> workers=new ConcurrentHashMap<Channel, MasterWorkerHolder>();
private ApplicationContext applicationContext;
private Master master;
private Scheduler scheduler;
private Dispatcher dispatcher;//调度任务 jobId
private Queue<JobElement> queue=new ArrayBlockingQueue<JobElement>(10000);//调试任务  debugId
private Queue<JobElement> debugQueue=new ArrayBlockingQueue<JobElement>(1000);//手动任务  historyId
private Queue<JobElement> manualQueue=new ArrayBlockingQueue<JobElement>(1000);
private MasterHandler handler;
private MasterServer server;
private ExecutorService threadPool=Executors.newCachedThreadPool();
private ScheduledExecutorService schedulePool=Executors.newScheduledThreadPool(5);

内容说明:

  • workers: 维护了与所有worker的rpc通信管道,同时包含了worker的心跳信息,(内存、正在跑的任务、心跳时间戳、worker host标识)
  • applicationContext:spring 管理数据库对象;
  • master: master中初始化了对Dispatcher的监听,同时维护了两个线程,一个线程将任务池中的任务分配给当前最合适的worker机器,并启动;另个线程用了监测与worker的心跳连接,对于超时一分钟的则进行关闭;
  • scheduler: 开源框架quartz的调度api,对于周期性的任务调度,zeus使用quarz的调度策略来进行调度;
  • dispatcher: 事件分发类;
  • queue: 正常调度的任务池;
  • debugQueue: 调试任务的任务池;
  • manualQueue: 手动任务的任务池;
  • handler: master与web和worker的rpc通信封装类;
  • server: 主节点 master netty服务器启动;
  • threadPool: 主节点rpc通信线程池;
  • schedulePool: 主节点调度线程池;

JobController:

任务控制器:

private final String jobId;
private CacheJobDescriptor cache;
private JobHistoryManager jobHistoryManager;
private GroupManager groupManager;
private Master master;
private MasterContext context;

任务状态的控制器,每个任务对应一个 JobController;通过接受不同的事件,来转换任务的状态机状态,从而到达管理任务生命周期的作用;

Zeus 中设定的任务的触发方式分为三种:

/** * 触发任务执行的3种类型
 * 1: 定时执行
 * 2:手动执行(不产生连锁反应)
 * 3:手动恢复(产生连锁反应)
 *
 */
public enum TriggerType{
   SCHEDULE(1),MANUAL(2),MANUAL_RECOVER(3);
   public String toName(){
   if(id==1){
      return "自动调度";
   }else if(id==2){
      return "手动触发";
   }else if(id==3){
      return "手动恢复";
   }
   return "未知";
}

Zeus中设定的任务状态:

/**
 * WAIT:
 *     Job没有开始,或者Job依赖的任务没有全部完成
 * RUNNING
 *     Job正在运行中
 * SUCCESS
 *     Job运行成功(瞬间状态)
 * FAILED
 *     Job运行失败(瞬间状态)
 * @author zhoufang
 *
 */
public enum Status{
   WAIT("wait"),RUNNING("running"),SUCCESS("success"),FAILED("failed");

Zeus中设定任务的大的类型:

public enum JobRunType {
   MapReduce("main"), Shell("shell"), Hive("hive");

Zeus中设定的任务调度的类型:

public enum JobScheduleType {
   Independent(0), Dependent(1), CyleJob(2);

如果job控制器的任务调度类型是Independent,表示这个任务是独立的任务,仅仅依靠调度周期来触发,不会由其他的任务调度起来。而Dependent类型的则是说任务依赖于上层任务的触发。cycle 任务分为天任务和小时任务,

public void handleEvent(AppEvent event) {
   try {
      if (event instanceof JobSuccessEvent) {
         successEventHandle((JobSuccessEvent) event);
      } else if (event instanceof JobFailedEvent) {
         failedEventHandle((JobFailedEvent) event);
      } else if (event instanceof ScheduleTriggerEvent) {
         triggerEventHandle((ScheduleTriggerEvent) event);
      } else if (event instanceof JobMaintenanceEvent) {
         maintenanceEventHandle((JobMaintenanceEvent) event);
      } else if (event.getType() == Events.Initialize) {
         initializeEventHandle();
      }
   } catch (Exception e) {
      // catch所有的异常,保证本job的异常不影响其他job的运行
      ScheduleInfoLog.error("JobId:" + jobId + " handleEvent error", e);
   }
}

处理 JobSuccessEvent 事件:

private void successEventHandle(JobSuccessEvent event) {
   if (event.getTriggerType() == TriggerType.MANUAL) {
      return;
   }
   String eId = event.getJobId();
   JobDescriptor jobDescriptor = cache.getJobDescriptor();
   if (jobDescriptor == null) {
      autofix();
      return;
   }
   if (!jobDescriptor.getAuto()) {
      return;
   }
   if (jobDescriptor.getScheduleType() == JobScheduleType.Independent) {
      return;
   }
   if (jobDescriptor.getScheduleType() == JobScheduleType.CyleJob) {
      cycleJobSuccessHandle(event);
      return;
   }
   if (!jobDescriptor.getDependencies().contains(eId)) {
      return;
   }
   JobStatus jobStatus = null;
   synchronized (this) {
      jobStatus = groupManager.getJobStatus(jobId);
      JobBean bean = groupManager.getUpstreamJobBean(jobId);
      String cycle = bean.getHierarchyProperties().getProperty(
            PropertyKeys.DEPENDENCY_CYCLE);
      if (cycle != null && !"".equals(cycle)) {
         Map<String, String> dep = jobStatus.getReadyDependency();
         if ("sameday".equals(cycle)) {
            SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd");
            String now = format.format(new Date());
            for (String key : new HashSet<String>(dep.keySet())) {
               String d = format.format(new Date(Long.valueOf(dep
                     .get(key))));
               if (!now.equals(d)) {
                  jobStatus.getReadyDependency().remove(key);
                  ScheduleInfoLog.info("JobId:" + jobId
                        + " remove overdue dependency " + key);
               }
            }
         }
      }
      ScheduleInfoLog.info("JobId:" + jobId
            + " received a successed dependency job with jobId:"
            + event.getJobId());
      ScheduleInfoLog.info("JobId:" + jobId + " the dependency jobId:"
            + event.getJobId() + " record it");
      jobStatus.getReadyDependency().put(eId,
            String.valueOf(new Date().getTime()));
      groupManager.updateJobStatus(jobStatus);
   }
   boolean allComplete = true;
   for (String key : jobDescriptor.getDependencies()) {
      if (jobStatus.getReadyDependency().get(key) == null) {
         allComplete = false;
         break;
      }
   }
   if (allComplete) {
      ScheduleInfoLog.info("JobId:" + jobId
            + " all dependency jobs is ready,run!");
      startNewJob(event.getTriggerType(), jobDescriptor, jobId);
   } else {
      ScheduleInfoLog.info("JobId:" + jobId
            + " some of dependency is not ready,waiting!");
   }
}

成功事件的触发是为了连锁效应对下游任务的处理,所以不处理触发类型是MANUAL的事件。cycle 任务收到一个任务的成功事件后,如果发现是自身任务的成功事件,并且自身没有任何的依赖,表示 这个 cycle 任务是一个独立的周期任务,需要运算下次开始时间。如果当前任务依赖于已经完成的任务,则在当前任务中保存已经完成的依赖,并检查当前任务是否可以执行。【该处的代码jobStatus.getReadyDependency().put(eId, event.getStatisEndTime()); 需要推敲一下】。任务依赖的周期都必须相同并且任务的结束时间也必须相同。如果周期不一样,因为只有天任务依赖小时任务,没有小时任务依赖天,所以可以判断自己肯定是天任务,而完成的是小时任务,因此需要判断该小时是否是23时即可。

处理 JobFailedEvent 事件:

private void failedEventHandle(JobFailedEvent event) {
   JobDescriptor jobDescriptor = cache.getJobDescriptor();
   if (jobDescriptor == null) {
      autofix();
      return;
   }
   if (!jobDescriptor.getAuto()) {
      return;
   }
   if (jobDescriptor.getDependencies().contains(event.getJobId())) {// 本Job依赖失败的Job
      if (event.getTriggerType() == TriggerType.SCHEDULE) {// 依赖的Job
                                                // 的失败类型是
                                                // SCHEDULE类型
         // 自身依赖的Job失败了,表明自身也无法继续执行,抛出失败的消息
         ZeusJobException exception = new ZeusJobException(event
               .getJobException().getCauseJobId(), "jobId:"
               + jobDescriptor.getId() + " 失败,原因是依赖的Job:"
               + event.getJobId() + " 执行失败", event.getJobException());
         ScheduleInfoLog.info("jobId:" + jobId
               + " is fail,as dependendy jobId:"
               + jobDescriptor.getId() + " is failed");
         // 记录进History日志
         JobHistory history = new JobHistory();
         history.setStartTime(new Date());
         history.setEndTime(new Date());
         history.setExecuteHost(null);
         history.setJobId(jobId);
         history.setTriggerType(event.getTriggerType());
         history.setStatus(Status.FAILED);
         history.getLog().appendZeusException(exception);
         history.setStatisEndTime(jobDescriptor.getStatisEndTime());
         history.setTimezone(jobDescriptor.getTimezone());
         history.setCycle(jobDescriptor.getCycle());
         history = jobHistoryManager.addJobHistory(history);
         jobHistoryManager.updateJobHistoryLog(history.getId(), history               .getLog().getContent());
         JobFailedEvent jfe = new JobFailedEvent(jobDescriptor.getId(),               event.getTriggerType(), history, exception);
         ScheduleInfoLog.info("JobId:" + jobId
               + " is fail,dispatch the fail event");
         // 广播消息
         context.getDispatcher().forwardEvent(jfe);
      }
   }
}

当依赖的一个Job失败时,本Job也自动失败了。

处理 ScheduleTriggerEvent 事件:

/**
 * 收到定时触发任务的事件的处理流程
 *
  * @param event */
private void triggerEventHandle(ScheduleTriggerEvent event) {
   String eId = event.getJobId();
   JobDescriptor jobDescriptor = cache.getJobDescriptor();
   if (jobDescriptor == null) {// 说明job被删除了,这是一个异常状况,autofix
      autofix();
      return;
   }   if (!eId.equals(jobDescriptor.getId())) {
      return;
   }
   ScheduleInfoLog.info("JobId:" + jobId
         + " receive a timer trigger event,statisTime is:"
         + jobDescriptor.getStatisEndTime());
   runJob(jobDescriptor);
}

收到触发任务事件,如果触发的任务是本任务的话,则启动该任务;

处理 JobMaintenanceEvent 事件:

private void maintenanceEventHandle(JobMaintenanceEvent event) {
   if (event.getType() == Events.UpdateJob
         && jobId.equals(event.getJobId())) {
      autofix();
   }
}

收到更新任务事件,如果触发的任务是本任务的话,则更新改任务的内存信息;

处理 Initialize 事件:

private void initializeEventHandle() {
   JobStatus jobStatus = groupManager.getJobStatus(jobId);
   if (jobStatus != null) {
      // 启动时发现在RUNNING 状态,说明上一次运行的结果丢失,将立即进行重试
      if (jobStatus.getStatus() == Status.RUNNING) {
         log.error("jobId=" + jobId
               + " 处于RUNNING状态,说明该JOB状态丢失,立即进行重试操作...");
         // 搜索上一次运行的日志,从日志中提取jobid 进行kill
         String operator = null;
         if (jobStatus.getHistoryId() != null) {
            JobHistory history = jobHistoryManager
                  .findJobHistory(jobStatus.getHistoryId());
            // 特殊情况下,有可能history查询为空
            if (history != null
                  && history.getStatus() == Status.RUNNING) {
               operator = history.getOperator();
               try {
                  JobContext temp = JobContext.getTempJobContext();
                  temp.setJobHistory(history);
                  new CancelHadoopJob(temp).run();
               } catch (Exception e) {
                  // 忽略
               }
            }
         }
         JobHistory history = new JobHistory();
         history.setIllustrate("启动服务器发现正在running状态,判断状态已经丢失,进行重试操作");
         history.setOperator(operator);
         history.setTriggerType(TriggerType.MANUAL_RECOVER);
         history.setJobId(jobId);
         context.getJobHistoryManager().addJobHistory(history);
         master.run(history);
      }
   }
   JobDescriptor jd = cache.getJobDescriptor();
   // 如果是定时任务,启动定时程序
   if (jd.getAuto() && jd.getScheduleType() == JobScheduleType.Independent) {
      String cronExpression = jd.getCronExpression();
      try {
         CronTrigger trigger = new CronTrigger(jd.getId(), "zeus",               cronExpression);
         JobDetail detail = new JobDetail(jd.getId(), "zeus",               TimerJob.class);
         detail.getJobDataMap().put("jobId", jd.getId());
         detail.getJobDataMap().put("dispatcher",               context.getDispatcher());
         context.getScheduler().scheduleJob(detail, trigger);
      } catch (Exception e) {
         if (e instanceof SchedulerException
               && "Based on configured schedule, the given trigger will never fire."
                     .equals(e.getMessage())) {
            // 定时器已经不会被触发了,关闭该job的自动调度功能
            jd.setAuto(false);
            try {
               groupManager.updateJob(jd.getOwner(), jd);
            } catch (ZeusException e1) {
               log.error("JobId:" + jobId + " 更新失败", e1);
            }
            cache.refresh();
         } else {
            log.error("JobId:" + jobId + " 定时程序启动失败", e);
         }
      }
   }   // 周期任务,并且没有依赖的情况下,直接根据开始时间执行
   if (jd.getAuto()
         && jd.getScheduleType() == JobScheduleType.CyleJob
         && (jd.getDependencies() == null || jd.getDependencies()
               .isEmpty())) {
      initCycleJob(jd);
   }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,544评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,430评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,764评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,193评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,216评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,182评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,063评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,917评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,329评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,543评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,722评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,425评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,019评论 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,671评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,825评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,729评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,614评论 2 353

推荐阅读更多精彩内容