一 主体流程
-
任务执行主体流程
二 初始化
2.1 配置
- 执行器xml配置
<bean id="xxlJobSpringExecutor" class="com.xxl.job.core.executor.impl.XxlJobSpringExecutor" init-method="start" destroy-method="destroy" >
<!-- 执行器注册中心地址[选填],为空则关闭自动注册 -->
<property name="adminAddresses" value="${xxl.job.admin.addresses}" />
<!-- 执行器AppName[选填],为空则关闭自动注册 -->
<property name="appName" value="${xxl.job.executor.appname}" />
<!-- 执行器IP[选填],为空则自动获取 -->
<property name="ip" value="${xxl.job.executor.ip}" />
<!-- 执行器端口号[选填],小于等于0则自动获取 -->
<property name="port" value="${xxl.job.executor.port}" />
<!-- 访问令牌[选填],非空则进行匹配校验 -->
<property name="accessToken" value="${xxl.job.accessToken}" />
<!-- 执行器日志路径[选填],为空则使用默认路径 -->
<property name="logPath" value="${xxl.job.executor.logpath}" />
<!-- 日志保存天数[选填],值大于3时生效 -->
<property name="logRetentionDays" value="${xxl.job.executor.logretentiondays}" />
</bean>
- 任务执行bean配置
xml配置定义,或者注解扫描等方式注册到容器中 - 任务执行bean注册
可通过注解@JobHandler(value="demoJobHandler")或者代码内api注册方式
2.2 XxlJobSpringExecutor初始化函数
- XxlJobSpringExecutor通过接口ApplicationContextAware获取容器bean
- XxlJobSpringExecutor.start()初始化
public void start() throws Exception {
// 注解JobHandler处理
initJobHandlerRepository(applicationContext);
// refresh GlueFactory
GlueFactory.refreshInstance(1);
// super start
super.start();
}
- 从容器中获取注解的任务处理bean,以注解值为key,目标bean为value保存到
ConcurrentHashMap<String, IJobHandler> jobHandlerRepository
中。后续收到任务消息时,查找对应的任务处理bean
if (applicationContext == null) {
return;
}
// 根据任务处理注解类从容器中获取所有配置了注解的bean
Map<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(JobHandler.class);
if (serviceBeanMap!=null && serviceBeanMap.size()>0) {
for (Object serviceBean : serviceBeanMap.values()) {
if (serviceBean instanceof IJobHandler){
String name = serviceBean.getClass().getAnnotation(JobHandler.class).value();
IJobHandler handler = (IJobHandler) serviceBean;
if (loadJobHandler(name) != null) {//禁止重复注册
throw new RuntimeException("xxl-job jobhandler naming conflicts.");
}
//以注解值为key,目标bean为value保存到map中
registJobHandler(name, handler);
}
}
}
}
2.3 XxlJobExecutor初始化函数
- 1 初始化日志路径,任务执行bean内部日志输出到该路径下
- 2 初始化访问调度服务端服务的xxlrpc的调用代理接口
- 3 初始化线程,定时清理任务执行日志
- 4 初始化线程,异步向服务端更新调度任务执行状态。
- 5 初始化执行器端服务的xxlrpc的provider端。
public void start() throws Exception {
XxlJobFileAppender.initLogPath(logPath);//1
initAdminBizList(adminAddresses, accessToken);//2
JobLogFileCleanThread.getInstance().start(logRetentionDays);//3
TriggerCallbackThread.getInstance().start();//4
port = port>0?port: NetUtil.findAvailablePort(9999);
ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp();
initRpcProvider(ip, port, appName, accessToken);//5
}
三 日志管理
- 生成日志文件名称及路径: 根路径/日期/任务id.log
public static String makeLogFileName(Date triggerDate, int logId) {
// filePath/yyyy-MM-dd
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); // avoid concurrent problem, can not be static
File logFilePath = new File(getLogPath(), sdf.format(triggerDate));
if (!logFilePath.exists()) {
logFilePath.mkdir();
}
// filePath/yyyy-MM-dd/9999.log
String logFileName = logFilePath.getPath()
.concat(File.separator)
.concat(String.valueOf(logId))
.concat(".log");
return logFileName;
}
- 任务执行线程的线程变量
InheritableThreadLocal<String> contextHolder
存储日志路径,查询和保存日志, - 写日志,任务执行bean内部调用写任务执行相关日志
public static void appendLog(String logFileName, String appendLog) {
//从线程变量中获取日志文件路径
if (logFileName==null || logFileName.trim().length()==0) {
return;
}
File logFile = new File(logFileName);
if (!logFile.exists()) {
try {
logFile.createNewFile();
} catch (IOException e) {
logger.error(e.getMessage(), e);
return;
}
}
if (appendLog == null) {
appendLog = "";
}
appendLog += "\r\n";//添加换行符号
FileOutputStream fos = null;
try {//通过FileOutputStream写日志到文件中
fos = new FileOutputStream(logFile, true);
fos.write(appendLog.getBytes("utf-8"));
fos.flush();//刷新到磁盘
} catch (Exception e) {
logger.error(e.getMessage(), e);
} finally {
if (fos != null) {
try {
fos.close();
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
}
}
}
- 查询日志,调度服务端通过xxlrpc接口查询任务执行日志
public static LogResult readLog(String logFileName, int fromLineNum){
//文件检查
if (logFileName==null || logFileName.trim().length()==0) {
return new LogResult(fromLineNum, 0, "readLog fail, logFile not found", true);
}
File logFile = new File(logFileName);
if (!logFile.exists()) {
return new LogResult(fromLineNum, 0, "readLog fail, logFile not exists", true);
}
// read file
StringBuffer logContentBuffer = new StringBuffer();
int toLineNum = 0;
LineNumberReader reader = null;
try {
reader = new LineNumberReader(new InputStreamReader(new FileInputStream(logFile), "utf-8"));
String line = null;
while ((line = reader.readLine())!=null) {
toLineNum = reader.getLineNumber();
if (toLineNum >= fromLineNum) {//从指定行开始读取日志返回
logContentBuffer.append(line).append("\n");
}
}
} catch (IOException e) {
logger.error(e.getMessage(), e);
} finally {
if (reader != null) {
try {
reader.close();
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
}
}
// 包含起始行,读取行数,日志内容
LogResult logResult = new LogResult(fromLineNum, toLineNum, logContentBuffer.toString(), false);
return logResult;
}
3.1 日志文件清理
localThread = new Thread(new Runnable() {
@Override
public void run() {
while (!toStop) {
try {
// clean log dir, over logRetentionDays
File[] childDirs = new File(XxlJobFileAppender.getLogPath()).listFiles();
if (childDirs!=null && childDirs.length>0) {
// today
Calendar todayCal = Calendar.getInstance();
todayCal.set(Calendar.HOUR_OF_DAY,0);
todayCal.set(Calendar.MINUTE,0);
todayCal.set(Calendar.SECOND,0);
todayCal.set(Calendar.MILLISECOND,0);
Date todayDate = todayCal.getTime();
for (File childFile: childDirs) {
// valid
if (!childFile.isDirectory()) {
continue;
}
if (childFile.getName().indexOf("-") == -1) {
continue;
}
Date logFileCreateDate = null;
try {//获取日志日期
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd");
logFileCreateDate = simpleDateFormat.parse(childFile.getName());
} catch (ParseException e) {
logger.error(e.getMessage(), e);
}
if (logFileCreateDate == null) {
continue;
}
if ((todayDate.getTime()-logFileCreateDate.getTime()) >= logRetentionDays * (24 * 60 * 60 * 1000) ) {//删除日期下的日志文件
FileUtil.deleteRecursively(childFile);
}
}
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
try {//每天清理一次
TimeUnit.DAYS.sleep(1);
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
}
}
logger.info(">>>>>>>>>>> xxl-job, executor JobLogFileCleanThread thread destory.");
}
});
四 执行器注册
- 向调度服务端发送注册信息,服务端可知道应用可用于任务执行的执行器信息
registryThread = new Thread(new Runnable() {
@Override
public void run() {
while (!toStop) {
try {//心跳报文:应用,地址,注册类型
RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appName, address);
for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {//遍历调度端地址
try {
ReturnT<String> registryResult = adminBiz.registry(registryParam);
if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
registryResult = ReturnT.SUCCESS;
logger.info(">>>>>>>>>>> xxl-job registry success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
break;//一个调度端注册成功即可
} else {
logger.info(">>>>>>>>>>> xxl-job registry fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
}
} catch (Exception e) {
logger.info(">>>>>>>>>>> xxl-job registry error, registryParam:{}", registryParam, e);
}
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
try {//定时发送注册报文
TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
} catch (InterruptedException e) {
logger.warn(">>>>>>>>>>> xxl-job, executor registry thread interrupted, error msg:{}", e.getMessage());
}
}
// registry remove
try {//执行器下线,则发送解注册报文
RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appName, address);
for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
try {
ReturnT<String> registryResult = adminBiz.registryRemove(registryParam);
if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
registryResult = ReturnT.SUCCESS;
logger.info(">>>>>>>>>>> xxl-job registry-remove success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
break;//一个调度端发送成功即可
} else {
logger.info(">>>>>>>>>>> xxl-job registry-remove fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
}
} catch (Exception e) {
logger.info(">>>>>>>>>>> xxl-job registry-remove error, registryParam:{}", registryParam, e);
}
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
logger.info(">>>>>>>>>>> xxl-job, executor registry thread destory.");
}
});
五 任务执行结果上报
5.1 执行完成时上报
-
LinkedBlockingQueue<HandleCallbackParam> callBackQueue
保存需上报的任务id,任务执行结果,及完成时间 - 异步线程通过queue获取执行结果,并上报
triggerCallbackThread = new Thread(new Runnable() {
@Override
public void run() {
while(!toStop){
try {
HandleCallbackParam callback = getInstance().callBackQueue.take();//休眠等待新的任务执行完成
if (callback != null) {
List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();
int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);
callbackParamList.add(callback);
if (callbackParamList!=null && callbackParamList.size()>0) {//获取所有执行完成的任务,上报状态
doCallback(callbackParamList);
}
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
try {//执行器停止前先上报所有执行中的任务结果
List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();
int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);
if (callbackParamList!=null && callbackParamList.size()>0) {
doCallback(callbackParamList);
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
logger.info(">>>>>>>>>>> xxl-job, executor callback thread destory.");
}
});
- 执行上报函数
private void doCallback(List<HandleCallbackParam> callbackParamList){
boolean callbackRet = false;
// callback, will retry if error
for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
try {
ReturnT<String> callbackResult = adminBiz.callback(callbackParamList);//遍历调度端,xxlrpc发送任务执行结果
if (callbackResult!=null && ReturnT.SUCCESS_CODE == callbackResult.getCode()) {
callbackLog(callbackParamList, "<br>----------- xxl-job job callback finish.");//记录任务状态上报是否成功的日志
callbackRet = true;
break;
} else {
callbackLog(callbackParamList, "<br>----------- xxl-job job callback fail, callbackResult:" + callbackResult);
}
} catch (Exception e) {
callbackLog(callbackParamList, "<br>----------- xxl-job job callback error, errorMsg:" + e.getMessage());
}
}
if (!callbackRet) {//上报失败,则记录到本地上报失败记录文件中,等待重试
appendFailCallbackFile(callbackParamList);
}
}
private void appendFailCallbackFile(List<HandleCallbackParam> callbackParamList){
// append file
String content = JacksonUtil.writeValueAsString(callbackParamList);
FileUtil.appendFileLine(failCallbackFileName, content);
}
5.2 上报失败时补偿重试
- 这里可能存在文件读写的并发问题
- 异步线程定时读取上报失败文件,重新上报任务执行状态
triggerRetryCallbackThread = new Thread(new Runnable() {
@Override
public void run() {
while(!toStop){
try {
retryFailCallbackFile();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
try {
TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
} catch (InterruptedException e) {
logger.warn(">>>>>>>>>>> xxl-job, executor retry callback thread interrupted, error msg:{}", e.getMessage());
}
}
logger.info(">>>>>>>>>>> xxl-job, executor retry callback thread destory.");
}
});
- 失败上报补偿处理
private void retryFailCallbackFile(){
// 文件中读取失败上报的任务,并删除失败上报文件
List<String> fileLines = FileUtil.loadFileLines(failCallbackFileName);
FileUtil.deleteFile(failCallbackFileName);
// parse
List<HandleCallbackParam> failCallbackParamList = new ArrayList<>();
if (fileLines!=null && fileLines.size()>0) {
for (String line: fileLines) {
List<HandleCallbackParam> failCallbackParamListTmp = JacksonUtil.readValue(line, List.class, HandleCallbackParam.class);//解析成任务上报信息结构
if (failCallbackParamListTmp!=null && failCallbackParamListTmp.size()>0) {
failCallbackParamList.addAll(failCallbackParamListTmp);
}
}
}
// retry callback, 100 lines per page
if (failCallbackParamList!=null && failCallbackParamList.size()>0) {
int pagesize = 100;
List<HandleCallbackParam> pageData = new ArrayList<>();
for (int i = 0; i < failCallbackParamList.size(); i++) {
pageData.add(failCallbackParamList.get(i));
if (i>0 && i%pagesize == 0) {
doCallback(pageData);//分批上报
pageData.clear();
}
}
if (pageData.size() > 0) {
doCallback(pageData);
}
}
}
六 任务执行
- 1 任务处理函数的预初始化模版函数调用
- 2 idleTimes,无任务执行的空转次数,达到指定次数则结束线程
- 3 blockqueue等待任务
- 4 删除任务id记录,id记录用于避免任务重复触发
- 5 线程变量记录任务日志文件名称,
- 6 线程变量记录分片任务信息
- 7 任务超时配置,则使用FutureTask限时等待任务完成,超时未完成则interrupt中止任务,返回超时结果
- 8 无超时配置,直接调用任务执行函数执行任务
- 9 任务结果通知给上报线程,成功或者失败
- 10 线程结束,还存在待执行任务,则上报任务执行失败
- 11 执行任务执行实现类的模版方法destroy,执行一些清理工作
public void run() {
try {
handler.init();//1
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
while(!toStop){
running = false;
idleTimes++;//2
TriggerParam triggerParam = null;
ReturnT<String> executeResult = null;
try {
triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);//3
if (triggerParam!=null) {
running = true;
idleTimes = 0;
triggerLogIdSet.remove(triggerParam.getLogId());//4
String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTim()), triggerParam.getLogId());
XxlJobFileAppender.contextHolder.set(logFileName);//5
ShardingUtil.setShardingVo(new ShardingUtil.ShardingVO(triggerParam.getBroadcastIndex(), triggerParam.getBroadcastTotal()));//6
// execute
XxlJobLogger.log("<br>----------- xxl-job job execute start -----------<br>----------- Param:" + triggerParam.getExecutorParams());
if (triggerParam.getExecutorTimeout() > 0) {//7
// limit timeout
Thread futureThread = null;
try {
final TriggerParam triggerParamTmp = triggerParam;
FutureTask<ReturnT<String>> futureTask = new FutureTask<ReturnT<String>>(new Callable<ReturnT<String>>() {
@Override
public ReturnT<String> call() throws Exception {
return handler.execute(triggerParamTmp.getExecutorParams());
}
});
futureThread = new Thread(futureTask);
futureThread.start();
executeResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS);
} catch (TimeoutException e) {
XxlJobLogger.log("<br>----------- xxl-job job execute timeout");
XxlJobLogger.log(e);
executeResult = new ReturnT<String>(IJobHandler.FAIL_TIMEOUT.getCode(), "job execute timeout ");
} finally {
futureThread.interrupt();
}
} else {//8
executeResult = handler.execute(triggerParam.getExecutorParams());
}
if (executeResult == null) {
executeResult = IJobHandler.FAIL;
}
XxlJobLogger.log("<br>----------- xxl-job job execute end(finish) -----------<br>----------- ReturnT:" + executeResult);
} else {
if (idleTimes > 30) {
XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit.");
}
}
} catch (Throwable e) {
if (toStop) {
XxlJobLogger.log("<br>----------- JobThread toStop, stopReason:" + stopReason);
}
StringWriter stringWriter = new StringWriter();
e.printStackTrace(new PrintWriter(stringWriter));
String errorMsg = stringWriter.toString();
executeResult = new ReturnT<String>(ReturnT.FAIL_CODE, errorMsg);
XxlJobLogger.log("<br>----------- JobThread Exception:" + errorMsg + "<br>----------- xxl-job job execute end(error) -----------");
} finally {//9
if(triggerParam != null) {
// callback handler info
if (!toStop) {
// commonm
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTim(), executeResult));
} else {
// is killed
ReturnT<String> stopResult = new ReturnT<String>(ReturnT.FAIL_CODE, stopReason + " [job running,killed]");
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTim(), stopResult));
}
}
}
}
while(triggerQueue !=null && triggerQueue.size()>0){//10
TriggerParam triggerParam = triggerQueue.poll();
if (triggerParam!=null) {
// is killed
ReturnT<String> stopResult = new ReturnT<String>(ReturnT.FAIL_CODE, stopReason + " [job not executed, in the job queue, killed.]");
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTim(), stopResult));
}
}
try {//11
handler.destroy();
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
logger.info(">>>>>>>>>>> xxl-job JobThread stoped, hashCode:{}", Thread.currentThread());
}
七 执行器端xxlrpc服务
7.1 空状态检测
- 检测任务线程是否有待执行任务
public ReturnT<String> idleBeat(int jobId) {
// isRunningOrHasQueue
boolean isRunningOrHasQueue = false;
JobThread jobThread = XxlJobExecutor.loadJobThread(jobId);
if (jobThread != null && jobThread.isRunningOrHasQueue()) {
isRunningOrHasQueue = true;
}
if (isRunningOrHasQueue) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "job thread is running or has trigger queue.");
}
return ReturnT.SUCCESS;
}
7.2 停止任务
- 获取目标任务线程
public ReturnT<String> kill(int jobId) {
// kill handlerThread, and create new one
JobThread jobThread = XxlJobExecutor.loadJobThread(jobId);
if (jobThread != null) {
XxlJobExecutor.removeJobThread(jobId, "scheduling center kill job.");
return ReturnT.SUCCESS;
}
return new ReturnT<String>(ReturnT.SUCCESS_CODE, "job thread aleady killed.");
}
- 设置线程停止标记,interrupt通知线程中断
public static void removeJobThread(int jobId, String removeOldReason){
JobThread oldJobThread = jobThreadRepository.remove(jobId);
if (oldJobThread != null) {
oldJobThread.toStop(removeOldReason);
oldJobThread.interrupt();
}
}
7.3 任务日志查询
- 任务id获取任务日志路径,根据行号读取任务日志内容
public ReturnT<LogResult> log(long logDateTim, int logId, int fromLineNum) {
// log filename: logPath/yyyy-MM-dd/9999.log
String logFileName = XxlJobFileAppender.makeLogFileName(new Date(logDateTim), logId);
LogResult logResult = XxlJobFileAppender.readLog(logFileName, fromLineNum);
return new ReturnT<LogResult>(logResult);
}
7.4 任务触发
- 根据任务类型,获取任务对应的执行handler。
java bean类型,则是用户自行定义的任务执行handler
GLUE_GROOVY类型,则使用GlueJobHandler
脚本类型,则ScriptJobHandler - 若任务id已存在对应任务线程,则根据并发策略处理,
SERIAL_EXECUTION("Serial execution"),//串行执行任务
DISCARD_LATER("Discard Later"),//丢弃后续任务
COVER_EARLY("Cover Early");//后续任务覆盖之前任务
- 任务id不存在对应任务线程,则初始化任务线程。
此时若已有任务线程则通知停止,表示任务并发策略使用的是COVER_EARLY覆盖策略
public static JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason){
JobThread newJobThread = new JobThread(jobId, handler);
newJobThread.start();
logger.info(">>>>>>>>>>> xxl-job regist JobThread success, jobId:{}, handler:{}", new Object[]{jobId, handler});
JobThread oldJobThread = jobThreadRepository.put(jobId, newJobThread); // putIfAbsent | oh my god, map's put method return the old value!!!
if (oldJobThread != null) {
oldJobThread.toStop(removeOldReason);
oldJobThread.interrupt();
}
return newJobThread;
}
- blockqueue通知任务线程执行任务
ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
7.4.1 ScriptJobHandler
- 使用脚本内容创建脚本文件
public ReturnT<String> execute(String param) throws Exception {
if (!glueType.isScript()) {
return new ReturnT<String>(IJobHandler.FAIL.getCode(), "glueType["+ glueType +"] invalid.");
}
// cmd
String cmd = glueType.getCmd();
// make script file
String scriptFileName = XxlJobFileAppender.getGlueSrcPath()
.concat(File.separator)
.concat(String.valueOf(jobId))
.concat("_")
.concat(String.valueOf(glueUpdatetime))
.concat(glueType.getSuffix());
File scriptFile = new File(scriptFileName);
if (!scriptFile.exists()) {
ScriptUtil.markScriptFile(scriptFileName, gluesource);
}
// log file
String logFileName = XxlJobFileAppender.contextHolder.get();
// script params:0=param、1=分片序号、2=分片总数
ShardingUtil.ShardingVO shardingVO = ShardingUtil.getShardingVo();
String[] scriptParams = new String[3];
scriptParams[0] = param;
scriptParams[1] = String.valueOf(shardingVO.getIndex());
scriptParams[2] = String.valueOf(shardingVO.getTotal());
// invoke
XxlJobLogger.log("----------- script file:"+ scriptFileName +" -----------");
int exitValue = ScriptUtil.execToFile(cmd, scriptFileName, logFileName, scriptParams);
if (exitValue == 0) {
return IJobHandler.SUCCESS;
} else {
return new ReturnT<String>(IJobHandler.FAIL.getCode(), "script exit value("+exitValue+") is failed");
}
}
- DefaultExecutor调用脚本命令执行脚本文件,接收脚本输出日志保存到任务日志文件中
public static int execToFile(String command, String scriptFile, String logFile, String... params) throws IOException {
// 标准输出:print (null if watchdog timeout)
// 错误输出:logging + 异常 (still exists if watchdog timeout)
// 标准输入
FileOutputStream fileOutputStream = null; //
try {
fileOutputStream = new FileOutputStream(logFile, true);
PumpStreamHandler streamHandler = new PumpStreamHandler(fileOutputStream, fileOutputStream, null);
// command
CommandLine commandline = new CommandLine(command);
commandline.addArgument(scriptFile);
if (params!=null && params.length>0) {
commandline.addArguments(params);
}
// exec
DefaultExecutor exec = new DefaultExecutor();
exec.setExitValues(null);
exec.setStreamHandler(streamHandler);
int exitValue = exec.execute(commandline); // exit code: 0=success, 1=error
return exitValue;
} catch (Exception e) {
XxlJobLogger.log(e);
return -1;
} finally {
if (fileOutputStream != null) {
try {
fileOutputStream.close();
} catch (IOException e) {
XxlJobLogger.log(e);
}
}
}
}
7.4.2 GlueJobHandler
- 初始化
if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) {
// valid old jobThread
if (jobThread != null &&
!(jobThread.getHandler() instanceof GlueJobHandler
&& ((GlueJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {
// change handler or gluesource updated, need kill old thread
removeOldReason = "change job source or glue type, and terminate the old job thread.";
jobThread = null;
jobHandler = null;
}
// valid handler
if (jobHandler == null) {
try {
IJobHandler originJobHandler = GlueFactory.getInstance().loadNewInstance(triggerParam.getGlueSource());
jobHandler = new GlueJobHandler(originJobHandler, triggerParam.getGlueUpdatetime());
} catch (Exception e) {
logger.error(e.getMessage(), e);
return new ReturnT<String>(ReturnT.FAIL_CODE, e.getMessage());
}
}
}
- 执行
public ReturnT<String> execute(String param) throws Exception {
XxlJobLogger.log("----------- glue.version:"+ glueUpdatetime +" -----------");
return jobHandler.execute(param);
}