Job
public interface Job {
Integer run() throws Exception;
void cancel();
JobContext getJobContext();
boolean isCanceled();
}
- run:执行入口
- cancel:任务取消入口
- getJobContext:任务上下文
任务的日志是存储在数据库中
ProcessJob
通过操作系统创建进程Process的Job任务
public abstract List<String> getCommandList();
public Integer run() throws Exception{
int exitCode=-999;
String jobType=jobContext.getProperties().getAllProperties().get(RunningJobKe
ys.JOB_RUN_TYPE);
buildHadoopConf(jobType);
buildHiveConf(jobType); //设置环境变量
for(String key:jobContext.getProperties().getAllProperties().keySet()){
if(jobContext.getProperties().getProperty(key)!=null && (key.startsWith("instance.") || key.startsWith("secret."))){
envMap.put(key, jobContext.getProperties().getProperty(key));
}
}
envMap.put("instance.workDir", jobContext.getWorkDir());
List<String> commands=getCommandList();
for(String s:commands){
log("DEBUG Command:"+s);
ProcessBuilder builder = new ProcessBuilder(partitionCommandLine(s));
builder.directory(new File(jobContext.getWorkDir()));
builder.environment().putAll(envMap);
process=builder.start();
final InputStream inputStream = process.getInputStream();
final InputStream errorStream = process.getErrorStream();
String threadName=null;
if(jobContext.getJobHistory()!=null && jobContext.getJobHistory().getJobId()!=null){
threadName="jobId="+jobContext.getJobHistory().getJobId();
}else if(jobContext.getDebugHistory()!=null && jobContext.getDebugHistory().getId()!=null){
threadName="debugId="+jobContext.getDebugHistory().getId();
}else{
threadName="not-normal-job";
}
new Thread(new Runnable() {
@Override
public void run() {
try{
BufferedReader reader=new BufferedReader(new InputStreamReader(inputStream));
String line;
while((line=reader.readLine())!=null){
logConsole(line);
}
}catch(Exception e){
log(e);
log("接收日志出错,推出日志接收");
}
}
},threadName).start();
new Thread(new Runnable() {
@Override
public void run() {
try {
BufferedReader reader=new BufferedReader(new InputStreamReader(errorStream));
String line;
while((line=reader.readLine())!=null){
logConsole(line);
}
} catch (Exception e) {
log(e);
log("接收日志出错,推出日志接收");
}
}
},threadName).start();
exitCode = -999;
try {
exitCode = process.waitFor();
} catch (InterruptedException e) {
log(e);
} finally{
process=null;
}
if(exitCode!=0){
return exitCode;
}
}
return exitCode;
}
可以看到任务的启动的原理很简单,启动一个java的子进程,执行的命令由不同的子任务自行定义,进程的环境变量也有不同的子任务自行定义;
public void cancel(){
try {
new CancelHadoopJob(jobContext).run();
} catch (Exception e1) {
log(e1);
} //强制kill 进程
if (process != null) {
log("WARN Attempting to kill the process ");
try {
process.destroy();
int pid=getProcessId();
Runtime.getRuntime().exec("kill -9 "+pid);
} catch (Exception e) {
log(e);
} finally{
process=null;
}
}
}
杀死任务,除了对本进程进行杀死之外,如果是hadoop平台任务,还需要进行hadoop平台任务的深度杀死;
下面我们罗列一下所有的子任务:
JavaJob
代表一个执行java命令行的Job
public List<String> getCommandList() {
List<String> commands=new ArrayList<String>();
String command = JAVA_COMMAND + " ";
command += getJVMArguments() + " ";
command += "-Xms" + getInitialMemorySize() + " ";
command += "-Xmx" + getMaxMemorySize() + " ";
if(getClassPaths()!=null && !getClassPaths().trim().equals("")){
command += "-cp " + getClassPaths()+ " ";
}
command += getJavaClass() + " ";
command += getMainArguments();
commands.add(command);
return commands;
}
HiveJob
使用hive命令行来执行任务
public List<String> getCommandList() {
String hiveFilePath = getProperty(PropertyKeys.RUN_HIVE_PATH, "");
List<String> list = new ArrayList<String>();
StringBuffer sb = new StringBuffer();
sb.append("hive");
// 引入常用udf函数
if (getUdfSql()) {
sb.append(" -i ").append(jobContext.getWorkDir())
.append(File.separator).append(UDF_SQL_NAME);
}
sb.append(" -f ").append(hiveFilePath); // 执行shell
list.add(sb.toString());
return list;
}
public Integer run() throws Exception {
Date start = new Date();
Integer exitCode = runInner(); // 如果任务失败,且整个任务执行时间小于10分钟,则进行重试
if (exitCode != 0
&& getJobContext().getRunType() == JobContext.SCHEDULE_RUN
&& new Date().getTime() - start.getTime() < 10 * 60 * 1000L) {
log("Hive Job Fail in 10 min , try to retry");
exitCode = runInner();
}
return exitCode;
}
public Integer runInner() throws Exception {
String script = getProperties().getLocalProperty(PropertyKeys.JOB_SCRIPT);
File f = new File(jobContext.getWorkDir() + File.separator
+ (new Date().getTime()) + ".hive");
if (!f.exists()) {
f.createNewFile();
}
OutputStreamWriter writer = null;
try {
writer = new OutputStreamWriter(new FileOutputStream(f),
Charset.forName(jobContext.getProperties().getProperty("zeus.fs.encode", "utf-8")));
writer.write(script.replaceAll("^--.*", ""));
} catch (Exception e) {
jobContext.getJobHistory().getLog().appendZeusException(e);
} finally {
IOUtils.closeQuietly(writer);
}
getProperties().setProperty(PropertyKeys.RUN_HIVE_PATH,f.getAbsolutePath());
return super.run();
}
ShellJob
采用Shell脚本的任务
public List<String> getCommandList() {
String script=null;
if(shell!=null){
script=shell;
}else{
script=getProperties().getLocalProperty(PropertyKeys.JOB_SCRIPT);
}
OutputStreamWriter writer=null;
try {
File f=new File(jobContext.getWorkDir()+File.separator+(new Date().getTime())+".sh");
if(!f.exists()){
f.createNewFile();
}
writer=new OutputStreamWriter(new FileOutputStream(f),Charset.forName(jobContext.getProperties().getProperty("zeus.fs.encode", "utf-8")));
writer.write(script);
getProperties().setProperty(PropertyKeys.RUN_SHELLPATH, f.getAbsolutePath());
} catch (Exception e) {
jobContext.getJobHistory().getLog().appendZeusException(e);
} finally{
IOUtils.closeQuietly(writer);
}
String shellFilePath=getProperty(PropertyKeys.RUN_SHELLPATH, "");
List<String> list=new ArrayList<String>(); //修改权限
list.add("chmod u+x " + shellFilePath); //格式转换
list.add("dos2unix " + shellFilePath); //执行shell
list.add("sh "+shellFilePath);
return list;
}
UploadHdfsFileJob
上传附件到HDFS系统的Job
public List<String> getCommandList() {
List<String> commands = new ArrayList<String>();
String hadoopCmd = JobUtils.getHadoopCmd(envMap);
commands.add(hadoopCmd + " fs -copyFromLocal " + localFilePath + " "
+ hdfsPath);
return commands;
}
MemUseRateJob
public Integer run() throws Exception {
//window mac 系统直接返回成功
String os=System.getProperties().getProperty("os.name");
if(os!=null && (os.startsWith("win") || os.startsWith("Win") || os.startsWith("Mac"))){
//放一个假的数字,方便开发
jobContext.putData("mem", 0.5);
return 0;
}
Integer exitCode=super.run();
if(exitCode==0){
String[] content=getJobContext().getJobHistory().getLog().getContent().split("\n");
for(String s:content){
if(s.contains("buffers/cache")){
String line=s.substring(s.indexOf("buffers/cache:"));
Matcher matcher=pattern.matcher(line);
double used=0d;
double free=0d;
int num=0;
while(matcher.find()){
if(num==0){
used=Double.valueOf(matcher.group());
num++;
continue;
}
if(num==1){
free=Double.valueOf(matcher.group());
break;
}
}
if((new Date().getTime()-date.getTime())>3*60*1000){
ScheduleInfoLog.info("mem use rate used:"+used+" free:"+free+" rate:"+(used/(used+free)));
date=new Date();
}
// 可能迭代不到,迭代到的话这里就会return,所以最后面应该return -1
jobContext.putData("mem", (used/(used+free)));
if(used/(used+free)>rate){
return -1;
}else{
return 0;
}
}
}
}
return -1;
}
MapReduceJob
public Integer run() throws Exception {
List<Map<String, String>> resources=jobContext.getResources();
if(resources!=null && !resources.isEmpty()){
StringBuffer sb=new StringBuffer();
for(Map<String, String> map:jobContext.getResources()){
if(map.get("uri")!=null){
String uri=map.get("uri");
if(uri.startsWith("hdfs://") && uri.endsWith(".jar")){
sb.append(uri.substring("hdfs://".length())).append(",");
}
}
}
jobContext.getProperties().setProperty("core-site.tmpjars", sb.toString().substring(0, sb.toString().length()-1));
}
return super.run();
}
HadoopShellJob
public class HadoopShellJob extends ShellJob{
public HadoopShellJob(JobContext jobContext) {
super(jobContext);
jobContext.getProperties().setProperty(RunningJobKeys.JOB_RUN_TYPE, "HadoopShellJob");
}
@Override
public Integer run() throws Exception {
return super.run();
}
}
DownloadHdfsFileJob
从HDFS文件系统下载到本地文件系统
public List<String> getCommandList() {
String hadoopCmd=JobUtils.getHadoopCmd(envMap);
List<String> commands = new ArrayList<String>();
commands.add(hadoopCmd+" fs -copyToLocal " + hdfsFilePath + " " + localPath);
return commands;
}
CopyLocalFileJob
public List<String> getCommandList() {
String command="cp "+sourcePath+" "+getJobContext().getWorkDir();
return Arrays.asList(command);
}
CancelHadoopJob
扫描日志,揪出hadoop任务id,进行kill
public List<String> getCommandList() {
List<String> list = new ArrayList<String>();
// 检测日志,如果有hadoop的job,先kill 这些job
String log = null;
if (jobContext.getJobHistory() != null) {
log = jobContext.getJobHistory().getLog().getContent();
} else if (jobContext.getDebugHistory() != null) {
log = jobContext.getDebugHistory().getLog().getContent();
}
if (log != null) {
String hadoopCmd=JobUtils.getHadoopCmd(envMap);
String[] logs = log.split("\n");
for (String g : logs) {
String cmd = null;
if (g.contains("Running job: ")) {// mapreduce
String jobId = g.substring(g.indexOf("job_"));
cmd = hadoopCmd+" job -kill " + jobId;
} else if (g.contains("Starting Job =")) {// hive
String jobId = g.substring(g.lastIndexOf("job_"));
cmd = hadoopCmd+" job -kill " + jobId;
}
if (cmd != null) {
list.add(cmd);
}
}
}
return list;
}
ZooKeeperJob
public Integer run() throws Exception {
Integer exitCode=0;
try {
Boolean data=jobContext.getCoreExitCode()==0?true:false;
if(processer==null){
JSONObject content=new JSONObject();
content.put("id",getJobContext().getJobHistory().getJobId());
content.put("historyId", getJobContext().getJobHistory().getId());
content.put("status", jobContext.getCoreExitCode()==0?true:false);
content.put("time", new Date().getTime());
zkResultNotify.send(jobContext.getJobHistory().getId(), content.toString());
}else{
String host=processer.getHost();
String path=processer.getPath();
log("开始通知ZooKeeper host:"+host+",path:"+path);
zkResultNotify.send(host, path, data.toString());
log("ZooKeeper通知完成");
}
} catch (Exception e) {
exitCode=-1;
log("ZK通知发送失败");
log(e);
}
return exitCode;
}
OutputCleanJob
public Integer run() throws Exception {
if(jobContext.getCoreExitCode()!=0){
log("Job 运行失败,不进行产出目录清理");
return 0;
}
log("OutputClean 开始进行产出目录清理");
String path=ocp.getPath();
FileSystem fs=FileSystem.get(ConfUtil.getDefaultCoreSite());
FileStatus[] pathFiles=fs.listStatus(new Path(path));
boolean valid=true;
for(FileStatus f:pathFiles){
if(f.isDir()){
valid=false;
log("产出路径下面有文件夹,怀疑路径设置有错,拒绝执行清理操作");
break;
}
}
if(!valid){
return 0;
}
String upperPath=path;
if(upperPath.endsWith("/")){
upperPath=upperPath.substring(0,path.length()-1);
}
upperPath=upperPath.substring(0,upperPath.lastIndexOf("/"));
Path hdfsPath=new Path(upperPath);
SimpleDateFormat format=new SimpleDateFormat("yyyyMMdd");
Calendar cal=Calendar.getInstance();
cal.add(Calendar.DAY_OF_YEAR, ocp.getDays()*(-1));
Date limit=format.parse(format.format(cal.getTime()));
FileStatus[] files=fs.listStatus(hdfsPath);
for(FileStatus f:files){
String tmpPath=f.getPath().toString();
if(tmpPath.contains("/pt=")){
String yyyyMMdd=tmpPath.substring(tmpPath.indexOf("/pt=")+4,tmpPath.indexOf("/pt=")+12);
Date fdate=format.parse(yyyyMMdd);
if(fdate.before(limit)){
jobContext.getJobHistory().getLog().appendZeus("删除目录:" +tmpPath);
fs.delete(f.getPath(),true);
}
}
}
return 0;
}
OutputCheckJob
public Integer run() throws Exception {
if (jobContext.getCoreExitCode() != 0) {
jobContext.getJobHistory().getLog()
.appendZeus("Job 运行失败,不进行产出数据大小检测");
return 0;
}
jobContext.getJobHistory().getLog()
.appendZeus("OutputCheck 开始进行产出数据大小检测");
String upperPath = path;
if (upperPath.endsWith("/")) {
upperPath = upperPath.substring(0, path.length() - 1);
}
upperPath = upperPath.substring(0, upperPath.lastIndexOf("/"));
Path hdfsPath = new Path(upperPath);
FileSystem fs = FileSystem.get(ConfUtil.getDefaultCoreSite());
FileStatus[] files = fs.listStatus(hdfsPath);
double total = 0;
List<ContentSummary> dirFiles = new ArrayList<ContentSummary>();
for (FileStatus f : files) {
if (f.isDir()) {
ContentSummary cs = fs.getContentSummary(f.getPath());
if (cs.getLength() > 0) {
dirFiles.add(cs);
total += cs.getLength();
}
}
}
double ava = total / dirFiles.size();
double upper = ava * 1.5;
double lower = ava * 0.5;
List<ContentSummary> valid = new ArrayList<ContentSummary>();
for (ContentSummary cs : dirFiles) {
if (cs.getLength() < upper && cs.getLength() > lower) {
valid.add(cs);
}
}
total = 0d;
for (ContentSummary cs : valid) {
total += cs.getLength();
}
ava = total / valid.size();
jobContext.getJobHistory().getLog().appendZeus("产出数据上层路径:" + upperPath);
jobContext.getJobHistory().getLog()
.appendZeus("有效的参考文件夹个数:" + valid.size());
jobContext.getJobHistory().getLog().appendZeus("平均产出数据大小:" + ava);
jobContext.getJobHistory().getLog()
.appendZeus("设定数据大小浮动百分比:" + ocp.getPercent() + "%");
jobContext.getJobHistory().getLog().appendZeus("当前任务产出数据路径:" + path);
ContentSummary current = null;
try {
current = fs.getContentSummary(new Path(path));
} catch (Exception e) {
log("本次job产出数据的文件夹有问题");
log(e);
}
if (current != null) {
jobContext.getJobHistory().getLog()
.appendZeus("本次job产出数据大小:" + current.getLength());
} else {
return -1;
}
if ((Math.abs(current.getLength() - ava) / ava) > (ocp.getPercent() / 100.0)) {
double rate = Math.abs(current.getLength() - ava) / ava;
if (rate > (ocp.getPercent() / 100.0)) {
// 超出浮动范围
jobContext.getJobHistory().getLog().appendZeus("超出设定浮动比例,发出报警");
String jobId = jobContext.getJobHistory().getJobId();
StringBuffer sb = new StringBuffer("jobid=" + jobId
+ " 产出数据大小超出浮动比例 " + ocp.getPercent() + "%");
sb.append("\n平均产出数据大小为:" + ava);
sb.append("\n本次产出数据大小为:" + current.getLength());
}
} else {
jobContext.getJobHistory().getLog().appendZeus("产出数据检测OK"); }
return 0;
}
MavenDownloadJob
public Integer run() {
Integer exitCode = 0;
for (MavenConfig mc : files) {
String downloadUrl = mc.getUrl();
if (downloadUrl == null || "".equals(downloadUrl)) {
downloadUrl = "http://mvnrepo.taobao.ali.com:8081/nexus/content/groups/public/"
+ StringUtils.replaceChars(mc.getGroup(), '.', '/')
+ "/"
+ mc.getArtifact()
+ "/"
+ mc.getArtifact()
+ mc.getArtifact() + "-" + mc.getVersion() + ".jar";
}
try {
jobContext.getJobHistory().getLog()
.appendZeus("开始下载maven配置文件:" + downloadUrl);
download(downloadUrl, jobContext.getWorkDir() + File.separator + downloadUrl.substring(downloadUrl.lastIndexOf("/")));
jobContext.getJobHistory().getLog()
.appendZeus("下载maven配置文件 :" + downloadUrl + " 成功");
} catch (Exception e) {
jobContext.getJobHistory().getLog().appendZeusException(e);
exitCode = -1;
}
}
return exitCode;
}
MailJob
public Integer run() throws Exception {
String render=processer.getTemplate();
jobContext.getJobHistory().getLog().appendZeus("开始执行发送邮件job");
try {
mailAlarm.alarm(getJobContext().getJobHistory().getId(), processer.getSubject(), render);
jobContext.getJobHistory().getLog().appendZeus("邮件发送成功");
} catch (Exception e) {
jobContext.getJobHistory().getLog().appendZeusException(e);
}
return 0;
}
HiveProcesserJob
public Integer run() {
/************************
* 前置
************************/
if (jobContext.getCoreExitCode() == null) {
} else {
/*****************************
* 后置
*****************************/
if (processer.getOutputTables() != null
&& processer.getOutputTables().size() > 0) {
if (processer.getDriftPercent() != null) {
try {
log("hive分区产出检查开始");
exitCode = new HiveOutputCheckJob(jobContext,
processer, applicationContext).run();
} catch (Exception e) {
log("hive分区产出检查失败");
log(e);
} finally {
log("hive分区产出检查结束");
}
}
if (processer.getKeepDays() != null) {
try {
log("历史分区清理开始");
exitCode = new HivePartitionCleanJob(jobContext,
processer, applicationContext).run();
} catch (Exception e) {
log("历史分区清理失败");
log(e);
} finally {
log("历史分区清理结束");
}
}
}
}
return exitCode;
}