Zeus-Job体系

diagram5.png

Job

public interface Job {
      Integer run() throws Exception;
      void cancel();
      JobContext getJobContext();
      boolean isCanceled();
      }
  • run:执行入口
  • cancel:任务取消入口
  • getJobContext:任务上下文

任务的日志是存储在数据库中

ProcessJob

通过操作系统创建进程Process的Job任务

public abstract List<String> getCommandList();

public Integer run() throws Exception{
      int exitCode=-999;
   String jobType=jobContext.getProperties().getAllProperties().get(RunningJobKe
ys.JOB_RUN_TYPE);
      buildHadoopConf(jobType);
   buildHiveConf(jobType);      //设置环境变量
   for(String key:jobContext.getProperties().getAllProperties().keySet()){
      if(jobContext.getProperties().getProperty(key)!=null && (key.startsWith("instance.") || key.startsWith("secret."))){
         envMap.put(key, jobContext.getProperties().getProperty(key));
      }
   }
      envMap.put("instance.workDir", jobContext.getWorkDir());
      List<String> commands=getCommandList();
   for(String s:commands){
      log("DEBUG Command:"+s);
            ProcessBuilder builder = new ProcessBuilder(partitionCommandLine(s));
      builder.directory(new File(jobContext.getWorkDir()));
      builder.environment().putAll(envMap);
      process=builder.start();
      final InputStream inputStream = process.getInputStream();
           final InputStream errorStream = process.getErrorStream();
            String threadName=null;
      if(jobContext.getJobHistory()!=null && jobContext.getJobHistory().getJobId()!=null){
         threadName="jobId="+jobContext.getJobHistory().getJobId();
      }else if(jobContext.getDebugHistory()!=null && jobContext.getDebugHistory().getId()!=null){
         threadName="debugId="+jobContext.getDebugHistory().getId();
      }else{
         threadName="not-normal-job";
      }
      new Thread(new Runnable() {
         @Override
         public void run() {
            try{
               BufferedReader reader=new BufferedReader(new InputStreamReader(inputStream));
               String line;
               while((line=reader.readLine())!=null){
                  logConsole(line);
               }
            }catch(Exception e){
               log(e);
               log("接收日志出错,推出日志接收");
            }
         }
      },threadName).start();
      new Thread(new Runnable() {
         @Override
         public void run() {
            try {
               BufferedReader reader=new BufferedReader(new InputStreamReader(errorStream));
               String line;
               while((line=reader.readLine())!=null){
                     logConsole(line);
                                 }
            } catch (Exception e) {
                  log(e);
                  log("接收日志出错,推出日志接收");
               }
         }
      },threadName).start();
            exitCode = -999;
      try {
         exitCode = process.waitFor();
      } catch (InterruptedException e) {
         log(e);
      } finally{
         process=null;
      }
      if(exitCode!=0){
         return exitCode;
      }
   }
   return exitCode;
}

可以看到任务的启动的原理很简单,启动一个java的子进程,执行的命令由不同的子任务自行定义,进程的环境变量也有不同的子任务自行定义;

public void cancel(){
   try {
      new CancelHadoopJob(jobContext).run();
   } catch (Exception e1) {
      log(e1);
   }   //强制kill 进程
   if (process != null) {
      log("WARN Attempting to kill the process ");
      try {
         process.destroy();
         int pid=getProcessId();
         Runtime.getRuntime().exec("kill -9 "+pid);
      } catch (Exception e) {
         log(e);
      } finally{
         process=null;
      }
   }
}

杀死任务,除了对本进程进行杀死之外,如果是hadoop平台任务,还需要进行hadoop平台任务的深度杀死;

下面我们罗列一下所有的子任务:

JavaJob

代表一个执行java命令行的Job

public List<String> getCommandList() {
   List<String> commands=new ArrayList<String>();
   String command = JAVA_COMMAND + " ";
   command += getJVMArguments() + " ";
   command += "-Xms" + getInitialMemorySize() + " ";
   command += "-Xmx" + getMaxMemorySize() + " ";
   if(getClassPaths()!=null && !getClassPaths().trim().equals("")){
      command += "-cp " + getClassPaths()+ " ";
   }
   command += getJavaClass() + " ";
   command += getMainArguments();
         commands.add(command);
      return commands;
}
HiveJob

使用hive命令行来执行任务

public List<String> getCommandList() {
   String hiveFilePath = getProperty(PropertyKeys.RUN_HIVE_PATH, "");
   List<String> list = new ArrayList<String>();
   StringBuffer sb = new StringBuffer();
   sb.append("hive");
   // 引入常用udf函数
   if (getUdfSql()) {
      sb.append(" -i ").append(jobContext.getWorkDir())
            .append(File.separator).append(UDF_SQL_NAME);
   }
   sb.append(" -f ").append(hiveFilePath);   // 执行shell
   list.add(sb.toString());
   return list;
}


public Integer run() throws Exception {
   Date start = new Date();
   Integer exitCode = runInner();   // 如果任务失败,且整个任务执行时间小于10分钟,则进行重试
   if (exitCode != 0
         && getJobContext().getRunType() == JobContext.SCHEDULE_RUN
         && new Date().getTime() - start.getTime() < 10 * 60 * 1000L) {
      log("Hive Job Fail in 10 min , try to retry");
      exitCode = runInner();
   }
   return exitCode;
}




public Integer runInner() throws Exception {
      String script = getProperties().getLocalProperty(PropertyKeys.JOB_SCRIPT);
   File f = new File(jobContext.getWorkDir() + File.separator
         + (new Date().getTime()) + ".hive");
   if (!f.exists()) {
      f.createNewFile();
   }
   OutputStreamWriter writer = null;
   try {
      writer = new OutputStreamWriter(new FileOutputStream(f),
            Charset.forName(jobContext.getProperties().getProperty("zeus.fs.encode", "utf-8")));
      writer.write(script.replaceAll("^--.*", ""));
   } catch (Exception e) {
      jobContext.getJobHistory().getLog().appendZeusException(e);
   } finally {
      IOUtils.closeQuietly(writer);
   }
   getProperties().setProperty(PropertyKeys.RUN_HIVE_PATH,f.getAbsolutePath());
   return super.run();
}
ShellJob

采用Shell脚本的任务

public List<String> getCommandList() {
   String script=null;
   if(shell!=null){
      script=shell;
   }else{
      script=getProperties().getLocalProperty(PropertyKeys.JOB_SCRIPT);
   }
      OutputStreamWriter writer=null;
   try {
      File f=new File(jobContext.getWorkDir()+File.separator+(new Date().getTime())+".sh");
      if(!f.exists()){
         f.createNewFile();
      }
      writer=new OutputStreamWriter(new FileOutputStream(f),Charset.forName(jobContext.getProperties().getProperty("zeus.fs.encode", "utf-8")));
      writer.write(script);
      getProperties().setProperty(PropertyKeys.RUN_SHELLPATH, f.getAbsolutePath());
   } catch (Exception e) {
      jobContext.getJobHistory().getLog().appendZeusException(e);
   } finally{
      IOUtils.closeQuietly(writer);
   }
      String shellFilePath=getProperty(PropertyKeys.RUN_SHELLPATH, "");
   List<String> list=new ArrayList<String>();   //修改权限
   list.add("chmod u+x " + shellFilePath);   //格式转换
   list.add("dos2unix " + shellFilePath);   //执行shell
   list.add("sh "+shellFilePath);
   return list;
}
UploadHdfsFileJob

上传附件到HDFS系统的Job

public List<String> getCommandList() {
   List<String> commands = new ArrayList<String>();
   String hadoopCmd = JobUtils.getHadoopCmd(envMap);
   commands.add(hadoopCmd + " fs -copyFromLocal " + localFilePath + " "
         + hdfsPath);
   return commands;
}
MemUseRateJob
public Integer run() throws Exception {
   //window mac 系统直接返回成功
   String os=System.getProperties().getProperty("os.name");
   if(os!=null && (os.startsWith("win") || os.startsWith("Win") || os.startsWith("Mac"))){
      //放一个假的数字,方便开发
      jobContext.putData("mem", 0.5);
      return 0;
   }
   Integer exitCode=super.run();
   if(exitCode==0){
      String[] content=getJobContext().getJobHistory().getLog().getContent().split("\n");
      for(String s:content){
         if(s.contains("buffers/cache")){
            String line=s.substring(s.indexOf("buffers/cache:"));
            Matcher matcher=pattern.matcher(line);
            double used=0d;
            double free=0d;
            int num=0;
            while(matcher.find()){
               if(num==0){
                  used=Double.valueOf(matcher.group());
                  num++;
                  continue;
               }
               if(num==1){
                  free=Double.valueOf(matcher.group());
                  break;
               }
            }
            if((new Date().getTime()-date.getTime())>3*60*1000){
               ScheduleInfoLog.info("mem use rate used:"+used+" free:"+free+" rate:"+(used/(used+free)));
               date=new Date();
            }
            // 可能迭代不到,迭代到的话这里就会return,所以最后面应该return -1
            jobContext.putData("mem", (used/(used+free)));
            if(used/(used+free)>rate){
               return -1;
            }else{
               return 0;
            }
         }
      }
   }
   return -1;
}
MapReduceJob
public Integer run() throws Exception {
   List<Map<String, String>> resources=jobContext.getResources();
   if(resources!=null && !resources.isEmpty()){
      StringBuffer sb=new StringBuffer();
      for(Map<String, String> map:jobContext.getResources()){
         if(map.get("uri")!=null){
            String uri=map.get("uri");
            if(uri.startsWith("hdfs://") && uri.endsWith(".jar")){
               sb.append(uri.substring("hdfs://".length())).append(",");
            }
         }
      }
      jobContext.getProperties().setProperty("core-site.tmpjars", sb.toString().substring(0, sb.toString().length()-1));
   }
   return super.run();
}
HadoopShellJob
public class HadoopShellJob extends ShellJob{
   public HadoopShellJob(JobContext jobContext) {
      super(jobContext);
      jobContext.getProperties().setProperty(RunningJobKeys.JOB_RUN_TYPE, "HadoopShellJob");
   }
      @Override
   public Integer run() throws Exception {
      return super.run();
   }
}
DownloadHdfsFileJob

从HDFS文件系统下载到本地文件系统

public List<String> getCommandList() {
   String hadoopCmd=JobUtils.getHadoopCmd(envMap);
   List<String> commands = new ArrayList<String>();
   commands.add(hadoopCmd+" fs -copyToLocal " + hdfsFilePath + " " + localPath);
   return commands;
}
CopyLocalFileJob
public List<String> getCommandList() {
   String command="cp "+sourcePath+" "+getJobContext().getWorkDir();
   return Arrays.asList(command);
}
CancelHadoopJob

扫描日志,揪出hadoop任务id,进行kill

public List<String> getCommandList() {
   List<String> list = new ArrayList<String>();
   // 检测日志,如果有hadoop的job,先kill 这些job
   String log = null;
   if (jobContext.getJobHistory() != null) {
      log = jobContext.getJobHistory().getLog().getContent();
   } else if (jobContext.getDebugHistory() != null) {
      log = jobContext.getDebugHistory().getLog().getContent();
   }
   if (log != null) {
      String hadoopCmd=JobUtils.getHadoopCmd(envMap);
      String[] logs = log.split("\n");
      for (String g : logs) {
         String cmd = null;
         if (g.contains("Running job: ")) {// mapreduce
            String jobId = g.substring(g.indexOf("job_"));
            cmd = hadoopCmd+" job -kill " + jobId;
         } else if (g.contains("Starting Job =")) {// hive
            String jobId = g.substring(g.lastIndexOf("job_"));
            cmd = hadoopCmd+" job -kill " + jobId;
         }
         if (cmd != null) {
            list.add(cmd);
         }
      }
   }
   return list;
}
ZooKeeperJob
public Integer run() throws Exception {
   Integer exitCode=0;
   try {
      Boolean data=jobContext.getCoreExitCode()==0?true:false;
            if(processer==null){
         JSONObject content=new JSONObject();
         content.put("id",getJobContext().getJobHistory().getJobId());
         content.put("historyId", getJobContext().getJobHistory().getId());
         content.put("status", jobContext.getCoreExitCode()==0?true:false);        
 content.put("time", new Date().getTime());
         zkResultNotify.send(jobContext.getJobHistory().getId(), content.toString());
      }else{
         String host=processer.getHost();
         String path=processer.getPath();
         log("开始通知ZooKeeper host:"+host+",path:"+path);
         zkResultNotify.send(host, path, data.toString());
         log("ZooKeeper通知完成");
      }
         } catch (Exception e) {
      exitCode=-1;
      log("ZK通知发送失败");
      log(e);
   }
      return exitCode;
}
OutputCleanJob
public Integer run() throws Exception {
   if(jobContext.getCoreExitCode()!=0){
      log("Job 运行失败,不进行产出目录清理");
      return 0;
   }
   log("OutputClean 开始进行产出目录清理");
   String path=ocp.getPath();
   FileSystem fs=FileSystem.get(ConfUtil.getDefaultCoreSite());
   FileStatus[] pathFiles=fs.listStatus(new Path(path));
   boolean valid=true;
   for(FileStatus f:pathFiles){
      if(f.isDir()){
         valid=false;
         log("产出路径下面有文件夹,怀疑路径设置有错,拒绝执行清理操作");
         break;
      }
   }
   if(!valid){
      return 0;
   }
   String upperPath=path;
   if(upperPath.endsWith("/")){
      upperPath=upperPath.substring(0,path.length()-1);
   }
   upperPath=upperPath.substring(0,upperPath.lastIndexOf("/"));
   Path hdfsPath=new Path(upperPath);
      SimpleDateFormat format=new SimpleDateFormat("yyyyMMdd");
   Calendar cal=Calendar.getInstance();
   cal.add(Calendar.DAY_OF_YEAR, ocp.getDays()*(-1));
   Date limit=format.parse(format.format(cal.getTime()));
   FileStatus[] files=fs.listStatus(hdfsPath);
   for(FileStatus f:files){
      String tmpPath=f.getPath().toString();
      if(tmpPath.contains("/pt=")){
         String yyyyMMdd=tmpPath.substring(tmpPath.indexOf("/pt=")+4,tmpPath.indexOf("/pt=")+12);
         Date fdate=format.parse(yyyyMMdd);
         if(fdate.before(limit)){
            jobContext.getJobHistory().getLog().appendZeus("删除目录:" +tmpPath);
            fs.delete(f.getPath(),true);
         }
      }
   }
   return 0;
}
OutputCheckJob
public Integer run() throws Exception {
   if (jobContext.getCoreExitCode() != 0) {
      jobContext.getJobHistory().getLog()
            .appendZeus("Job 运行失败,不进行产出数据大小检测");
      return 0;
   }
   jobContext.getJobHistory().getLog()
         .appendZeus("OutputCheck 开始进行产出数据大小检测");
   String upperPath = path;
   if (upperPath.endsWith("/")) {
      upperPath = upperPath.substring(0, path.length() - 1);
   }
   upperPath = upperPath.substring(0, upperPath.lastIndexOf("/"));
   Path hdfsPath = new Path(upperPath);
   FileSystem fs = FileSystem.get(ConfUtil.getDefaultCoreSite());
   FileStatus[] files = fs.listStatus(hdfsPath);
   double total = 0;
   List<ContentSummary> dirFiles = new ArrayList<ContentSummary>();
   for (FileStatus f : files) {
      if (f.isDir()) {
         ContentSummary cs = fs.getContentSummary(f.getPath());
         if (cs.getLength() > 0) {
            dirFiles.add(cs);
            total += cs.getLength(); 
        }
      }
   }
   double ava = total / dirFiles.size();
   double upper = ava * 1.5;
   double lower = ava * 0.5;
   List<ContentSummary> valid = new ArrayList<ContentSummary>();
   for (ContentSummary cs : dirFiles) {
      if (cs.getLength() < upper && cs.getLength() > lower) {
         valid.add(cs);
      }
   }
   total = 0d;
   for (ContentSummary cs : valid) {
      total += cs.getLength();
   }
   ava = total / valid.size();
   jobContext.getJobHistory().getLog().appendZeus("产出数据上层路径:" + upperPath);
   jobContext.getJobHistory().getLog()
         .appendZeus("有效的参考文件夹个数:" + valid.size());
   jobContext.getJobHistory().getLog().appendZeus("平均产出数据大小:" + ava);
   jobContext.getJobHistory().getLog()
         .appendZeus("设定数据大小浮动百分比:" + ocp.getPercent() + "%");
   jobContext.getJobHistory().getLog().appendZeus("当前任务产出数据路径:" + path);
   ContentSummary current = null;
   try {
      current = fs.getContentSummary(new Path(path));
   } catch (Exception e) {
      log("本次job产出数据的文件夹有问题");
      log(e);
   }
   if (current != null) {
      jobContext.getJobHistory().getLog()
            .appendZeus("本次job产出数据大小:" + current.getLength());
   } else {
      return -1;
   }
   if ((Math.abs(current.getLength() - ava) / ava) > (ocp.getPercent() / 100.0)) {
      double rate = Math.abs(current.getLength() - ava) / ava;
      if (rate > (ocp.getPercent() / 100.0)) {
         // 超出浮动范围
         jobContext.getJobHistory().getLog().appendZeus("超出设定浮动比例,发出报警");
         String jobId = jobContext.getJobHistory().getJobId();
         StringBuffer sb = new StringBuffer("jobid=" + jobId
               + " 产出数据大小超出浮动比例 " + ocp.getPercent() + "%");
         sb.append("\n平均产出数据大小为:" + ava);
         sb.append("\n本次产出数据大小为:" + current.getLength());
      }
   } else {
      jobContext.getJobHistory().getLog().appendZeus("产出数据检测OK");   }
   return 0;
}
MavenDownloadJob
public Integer run() {
   Integer exitCode = 0;
   for (MavenConfig mc : files) {
      String downloadUrl = mc.getUrl();
      if (downloadUrl == null || "".equals(downloadUrl)) {
         downloadUrl = "http://mvnrepo.taobao.ali.com:8081/nexus/content/groups/public/"
               + StringUtils.replaceChars(mc.getGroup(), '.', '/')
               + "/"
               + mc.getArtifact()
               + "/"
               + mc.getArtifact()
               + mc.getArtifact() + "-" + mc.getVersion() + ".jar";
      }
      try {
         jobContext.getJobHistory().getLog()
               .appendZeus("开始下载maven配置文件:" + downloadUrl);
         download(downloadUrl, jobContext.getWorkDir() + File.separator               + downloadUrl.substring(downloadUrl.lastIndexOf("/")));
         jobContext.getJobHistory().getLog()
               .appendZeus("下载maven配置文件 :" + downloadUrl + " 成功");
      } catch (Exception e) {
         jobContext.getJobHistory().getLog().appendZeusException(e);
         exitCode = -1;
      }
   }
   return exitCode;
}
MailJob
public Integer run() throws Exception {
   String render=processer.getTemplate();
   jobContext.getJobHistory().getLog().appendZeus("开始执行发送邮件job");
   try {
      mailAlarm.alarm(getJobContext().getJobHistory().getId(), processer.getSubject(), render);
      jobContext.getJobHistory().getLog().appendZeus("邮件发送成功");
   } catch (Exception e) {
      jobContext.getJobHistory().getLog().appendZeusException(e);
   }
      return 0;
}
HiveProcesserJob
public Integer run() {
   /************************
    *  前置
    ************************/
   if (jobContext.getCoreExitCode() == null) {
   } else {
   /*****************************
    *  后置
    *****************************/
      if (processer.getOutputTables() != null
            && processer.getOutputTables().size() > 0) {
         if (processer.getDriftPercent() != null) {
            try {
               log("hive分区产出检查开始");
               exitCode = new HiveOutputCheckJob(jobContext,
                     processer, applicationContext).run();
            } catch (Exception e) {
               log("hive分区产出检查失败");
               log(e);
            } finally {
               log("hive分区产出检查结束");
            }
         }
         if (processer.getKeepDays() != null) {
            try {
               log("历史分区清理开始");
               exitCode = new HivePartitionCleanJob(jobContext,
                     processer, applicationContext).run();
            } catch (Exception e) {
               log("历史分区清理失败");
               log(e);
            } finally {
               log("历史分区清理结束");
            }
         }
      }
   }
   return exitCode;
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,014评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,796评论 3 386
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,484评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,830评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,946评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,114评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,182评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,927评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,369评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,678评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,832评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,533评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,166评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,885评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,128评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,659评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,738评论 2 351

推荐阅读更多精彩内容