xxl-job源码1-执行器

一 主体流程

  • 任务执行主体流程


    image.png

二 初始化

2.1 配置

  • 执行器xml配置
<bean id="xxlJobSpringExecutor" class="com.xxl.job.core.executor.impl.XxlJobSpringExecutor" init-method="start" destroy-method="destroy" >
    <!-- 执行器注册中心地址[选填],为空则关闭自动注册 -->
    <property name="adminAddresses" value="${xxl.job.admin.addresses}" />
    <!-- 执行器AppName[选填],为空则关闭自动注册 -->
    <property name="appName" value="${xxl.job.executor.appname}" />
    <!-- 执行器IP[选填],为空则自动获取 -->
    <property name="ip" value="${xxl.job.executor.ip}" />
    <!-- 执行器端口号[选填],小于等于0则自动获取 -->
    <property name="port" value="${xxl.job.executor.port}" />
    <!-- 访问令牌[选填],非空则进行匹配校验 -->
    <property name="accessToken" value="${xxl.job.accessToken}" />
    <!-- 执行器日志路径[选填],为空则使用默认路径 -->
    <property name="logPath" value="${xxl.job.executor.logpath}" />
    <!-- 日志保存天数[选填],值大于3时生效 -->
    <property name="logRetentionDays" value="${xxl.job.executor.logretentiondays}" />
</bean>
  • 任务执行bean配置
    xml配置定义,或者注解扫描等方式注册到容器中
  • 任务执行bean注册
    可通过注解@JobHandler(value="demoJobHandler")或者代码内api注册方式

2.2 XxlJobSpringExecutor初始化函数

  • XxlJobSpringExecutor通过接口ApplicationContextAware获取容器bean
  • XxlJobSpringExecutor.start()初始化
public void start() throws Exception {
    // 注解JobHandler处理
    initJobHandlerRepository(applicationContext);
    // refresh GlueFactory
    GlueFactory.refreshInstance(1);
    // super start
    super.start();
}
  • 从容器中获取注解的任务处理bean,以注解值为key,目标bean为value保存到ConcurrentHashMap<String, IJobHandler> jobHandlerRepository中。后续收到任务消息时,查找对应的任务处理bean
    if (applicationContext == null) {
        return;
    }

    // 根据任务处理注解类从容器中获取所有配置了注解的bean
    Map<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(JobHandler.class);

    if (serviceBeanMap!=null && serviceBeanMap.size()>0) {
        for (Object serviceBean : serviceBeanMap.values()) {
            if (serviceBean instanceof IJobHandler){
                String name = serviceBean.getClass().getAnnotation(JobHandler.class).value();
                IJobHandler handler = (IJobHandler) serviceBean;
                if (loadJobHandler(name) != null) {//禁止重复注册
                    throw new RuntimeException("xxl-job jobhandler naming conflicts.");
                }
//以注解值为key,目标bean为value保存到map中
                registJobHandler(name, handler);
            }
        }
    }
}

2.3 XxlJobExecutor初始化函数

  • 1 初始化日志路径,任务执行bean内部日志输出到该路径下
  • 2 初始化访问调度服务端服务的xxlrpc的调用代理接口
  • 3 初始化线程,定时清理任务执行日志
  • 4 初始化线程,异步向服务端更新调度任务执行状态。
  • 5 初始化执行器端服务的xxlrpc的provider端。
public void start() throws Exception {
    XxlJobFileAppender.initLogPath(logPath);//1
    initAdminBizList(adminAddresses, accessToken);//2
    JobLogFileCleanThread.getInstance().start(logRetentionDays);//3
    TriggerCallbackThread.getInstance().start();//4
    port = port>0?port: NetUtil.findAvailablePort(9999);
    ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp();
    initRpcProvider(ip, port, appName, accessToken);//5
}

三 日志管理

  • 生成日志文件名称及路径: 根路径/日期/任务id.log
public static String makeLogFileName(Date triggerDate, int logId) {

    // filePath/yyyy-MM-dd
    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");  // avoid concurrent problem, can not be static
    File logFilePath = new File(getLogPath(), sdf.format(triggerDate));
    if (!logFilePath.exists()) {
        logFilePath.mkdir();
    }

    // filePath/yyyy-MM-dd/9999.log
    String logFileName = logFilePath.getPath()
            .concat(File.separator)
            .concat(String.valueOf(logId))
            .concat(".log");
    return logFileName;
}
  • 任务执行线程的线程变量InheritableThreadLocal<String> contextHolder存储日志路径,查询和保存日志,
  • 写日志,任务执行bean内部调用写任务执行相关日志
public static void appendLog(String logFileName, String appendLog) {
        //从线程变量中获取日志文件路径
    if (logFileName==null || logFileName.trim().length()==0) {
        return;
    }
    File logFile = new File(logFileName);

    if (!logFile.exists()) {
        try {
            logFile.createNewFile();
        } catch (IOException e) {
            logger.error(e.getMessage(), e);
            return;
        }
    }

    if (appendLog == null) {
        appendLog = "";
    }
    appendLog += "\r\n";//添加换行符号
    
    FileOutputStream fos = null;
    try {//通过FileOutputStream写日志到文件中
        fos = new FileOutputStream(logFile, true);
        fos.write(appendLog.getBytes("utf-8"));
        fos.flush();//刷新到磁盘
    } catch (Exception e) {
        logger.error(e.getMessage(), e);
    } finally {
        if (fos != null) {
            try {
                fos.close();
            } catch (IOException e) {
                logger.error(e.getMessage(), e);
            }
        }
    }   
}
  • 查询日志,调度服务端通过xxlrpc接口查询任务执行日志
public static LogResult readLog(String logFileName, int fromLineNum){
    //文件检查
    if (logFileName==null || logFileName.trim().length()==0) {
        return new LogResult(fromLineNum, 0, "readLog fail, logFile not found", true);
    }
    File logFile = new File(logFileName);

    if (!logFile.exists()) {
        return new LogResult(fromLineNum, 0, "readLog fail, logFile not exists", true);
    }

    // read file
    StringBuffer logContentBuffer = new StringBuffer();
    int toLineNum = 0;
    LineNumberReader reader = null;
    try {
        reader = new LineNumberReader(new InputStreamReader(new FileInputStream(logFile), "utf-8"));
        String line = null;

        while ((line = reader.readLine())!=null) {
            toLineNum = reader.getLineNumber();
            if (toLineNum >= fromLineNum) {//从指定行开始读取日志返回
                logContentBuffer.append(line).append("\n");
            }
        }
    } catch (IOException e) {
        logger.error(e.getMessage(), e);
    } finally {
        if (reader != null) {
            try {
                reader.close();
            } catch (IOException e) {
                logger.error(e.getMessage(), e);
            }
        }
    }

    // 包含起始行,读取行数,日志内容
    LogResult logResult = new LogResult(fromLineNum, toLineNum, logContentBuffer.toString(), false);
    return logResult;
}

3.1 日志文件清理

localThread = new Thread(new Runnable() {
    @Override
    public void run() {
        while (!toStop) {
            try {
                // clean log dir, over logRetentionDays
                File[] childDirs = new File(XxlJobFileAppender.getLogPath()).listFiles();
                if (childDirs!=null && childDirs.length>0) {

                    // today
                    Calendar todayCal = Calendar.getInstance();
                    todayCal.set(Calendar.HOUR_OF_DAY,0);
                    todayCal.set(Calendar.MINUTE,0);
                    todayCal.set(Calendar.SECOND,0);
                    todayCal.set(Calendar.MILLISECOND,0);

                    Date todayDate = todayCal.getTime();

                    for (File childFile: childDirs) {

                        // valid
                        if (!childFile.isDirectory()) {
                            continue;
                        }
                        if (childFile.getName().indexOf("-") == -1) {
                            continue;
                        }

                        Date logFileCreateDate = null;
                        try {//获取日志日期
                            SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd");
                            logFileCreateDate = simpleDateFormat.parse(childFile.getName());
                        } catch (ParseException e) {
                            logger.error(e.getMessage(), e);
                        }
                        if (logFileCreateDate == null) {
                            continue;
                        }

                        if ((todayDate.getTime()-logFileCreateDate.getTime()) >= logRetentionDays * (24 * 60 * 60 * 1000) ) {//删除日期下的日志文件
                            FileUtil.deleteRecursively(childFile);
                        }

                    }
                }

            } catch (Exception e) {
                logger.error(e.getMessage(), e);
            }

            try {//每天清理一次
                TimeUnit.DAYS.sleep(1);
            } catch (InterruptedException e) {
                logger.error(e.getMessage(), e);
            }
        }
        logger.info(">>>>>>>>>>> xxl-job, executor JobLogFileCleanThread thread destory.");

    }
});

四 执行器注册

  • 向调度服务端发送注册信息,服务端可知道应用可用于任务执行的执行器信息
registryThread = new Thread(new Runnable() {
@Override
public void run() {

    while (!toStop) {
        try {//心跳报文:应用,地址,注册类型
            RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appName, address);
            for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {//遍历调度端地址
                try {
                    ReturnT<String> registryResult = adminBiz.registry(registryParam);
                    if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
                        registryResult = ReturnT.SUCCESS;
                        logger.info(">>>>>>>>>>> xxl-job registry success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                        break;//一个调度端注册成功即可
                    } else {
                        logger.info(">>>>>>>>>>> xxl-job registry fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                    }
                } catch (Exception e) {
                    logger.info(">>>>>>>>>>> xxl-job registry error, registryParam:{}", registryParam, e);
                }

            }
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        }

        try {//定时发送注册报文
            TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
        } catch (InterruptedException e) {
            logger.warn(">>>>>>>>>>> xxl-job, executor registry thread interrupted, error msg:{}", e.getMessage());
        }
    }

    // registry remove
    try {//执行器下线,则发送解注册报文
        RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appName, address);
        for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
            try {
                ReturnT<String> registryResult = adminBiz.registryRemove(registryParam);
                if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
                    registryResult = ReturnT.SUCCESS;
                    logger.info(">>>>>>>>>>> xxl-job registry-remove success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                    break;//一个调度端发送成功即可
                } else {
                    logger.info(">>>>>>>>>>> xxl-job registry-remove fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                }
            } catch (Exception e) {
                logger.info(">>>>>>>>>>> xxl-job registry-remove error, registryParam:{}", registryParam, e);
            }

        }
    } catch (Exception e) {
        logger.error(e.getMessage(), e);
    }
    logger.info(">>>>>>>>>>> xxl-job, executor registry thread destory.");
}
});

五 任务执行结果上报

5.1 执行完成时上报

  • LinkedBlockingQueue<HandleCallbackParam> callBackQueue保存需上报的任务id,任务执行结果,及完成时间
  • 异步线程通过queue获取执行结果,并上报
triggerCallbackThread = new Thread(new Runnable() {

@Override
public void run() {
    while(!toStop){
        try {
            HandleCallbackParam callback = getInstance().callBackQueue.take();//休眠等待新的任务执行完成
            if (callback != null) {
                List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();
                int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);
                callbackParamList.add(callback);
                if (callbackParamList!=null && callbackParamList.size()>0) {//获取所有执行完成的任务,上报状态
                    doCallback(callbackParamList);
                }
            }
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        }
    }

    try {//执行器停止前先上报所有执行中的任务结果
        List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();
        int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);
        if (callbackParamList!=null && callbackParamList.size()>0) {
            doCallback(callbackParamList);
        }
    } catch (Exception e) {
        logger.error(e.getMessage(), e);
    }
    logger.info(">>>>>>>>>>> xxl-job, executor callback thread destory.");

}
});
  • 执行上报函数
private void doCallback(List<HandleCallbackParam> callbackParamList){
    boolean callbackRet = false;
    // callback, will retry if error
    for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
        try {
            ReturnT<String> callbackResult = adminBiz.callback(callbackParamList);//遍历调度端,xxlrpc发送任务执行结果
            if (callbackResult!=null && ReturnT.SUCCESS_CODE == callbackResult.getCode()) {
                callbackLog(callbackParamList, "<br>----------- xxl-job job callback finish.");//记录任务状态上报是否成功的日志
                callbackRet = true;
                break;
            } else {
                callbackLog(callbackParamList, "<br>----------- xxl-job job callback fail, callbackResult:" + callbackResult);
            }
        } catch (Exception e) {
            callbackLog(callbackParamList, "<br>----------- xxl-job job callback error, errorMsg:" + e.getMessage());
        }
    }
    if (!callbackRet) {//上报失败,则记录到本地上报失败记录文件中,等待重试
        appendFailCallbackFile(callbackParamList);
    }
}

private void appendFailCallbackFile(List<HandleCallbackParam> callbackParamList){
    // append file
    String content = JacksonUtil.writeValueAsString(callbackParamList);
    FileUtil.appendFileLine(failCallbackFileName, content);
}

5.2 上报失败时补偿重试

  • 这里可能存在文件读写的并发问题
  • 异步线程定时读取上报失败文件,重新上报任务执行状态
triggerRetryCallbackThread = new Thread(new Runnable() {
@Override
public void run() {
    while(!toStop){
        try {
            retryFailCallbackFile();
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        }
        try {
            TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
        } catch (InterruptedException e) {
            logger.warn(">>>>>>>>>>> xxl-job, executor retry callback thread interrupted, error msg:{}", e.getMessage());
        }
    }
    logger.info(">>>>>>>>>>> xxl-job, executor retry callback thread destory.");
}
});
  • 失败上报补偿处理
private void retryFailCallbackFile(){
    // 文件中读取失败上报的任务,并删除失败上报文件
    List<String> fileLines = FileUtil.loadFileLines(failCallbackFileName);
    FileUtil.deleteFile(failCallbackFileName);

    // parse
    List<HandleCallbackParam> failCallbackParamList = new ArrayList<>();
    if (fileLines!=null && fileLines.size()>0) {
        for (String line: fileLines) {
            List<HandleCallbackParam> failCallbackParamListTmp = JacksonUtil.readValue(line, List.class, HandleCallbackParam.class);//解析成任务上报信息结构
            if (failCallbackParamListTmp!=null && failCallbackParamListTmp.size()>0) {
                failCallbackParamList.addAll(failCallbackParamListTmp);
            }
        }
    }

    // retry callback, 100 lines per page
    if (failCallbackParamList!=null && failCallbackParamList.size()>0) {
        int pagesize = 100;
        List<HandleCallbackParam> pageData = new ArrayList<>();
        for (int i = 0; i < failCallbackParamList.size(); i++) {
            pageData.add(failCallbackParamList.get(i));
            if (i>0 && i%pagesize == 0) {
                doCallback(pageData);//分批上报
                pageData.clear();
            }
        }
        if (pageData.size() > 0) {
            doCallback(pageData);
        }
    }
}

六 任务执行

  • 1 任务处理函数的预初始化模版函数调用
  • 2 idleTimes,无任务执行的空转次数,达到指定次数则结束线程
  • 3 blockqueue等待任务
  • 4 删除任务id记录,id记录用于避免任务重复触发
  • 5 线程变量记录任务日志文件名称,
  • 6 线程变量记录分片任务信息
  • 7 任务超时配置,则使用FutureTask限时等待任务完成,超时未完成则interrupt中止任务,返回超时结果
  • 8 无超时配置,直接调用任务执行函数执行任务
  • 9 任务结果通知给上报线程,成功或者失败
  • 10 线程结束,还存在待执行任务,则上报任务执行失败
  • 11 执行任务执行实现类的模版方法destroy,执行一些清理工作
public void run() {
    try {
        handler.init();//1
    } catch (Throwable e) {
        logger.error(e.getMessage(), e);
    }

    while(!toStop){
        running = false;
        idleTimes++;//2

        TriggerParam triggerParam = null;
        ReturnT<String> executeResult = null;
        try {
            triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);//3
            if (triggerParam!=null) {
                running = true;
                idleTimes = 0;
                triggerLogIdSet.remove(triggerParam.getLogId());//4
                String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTim()), triggerParam.getLogId());
                XxlJobFileAppender.contextHolder.set(logFileName);//5
                ShardingUtil.setShardingVo(new ShardingUtil.ShardingVO(triggerParam.getBroadcastIndex(), triggerParam.getBroadcastTotal()));//6

                // execute
                XxlJobLogger.log("<br>----------- xxl-job job execute start -----------<br>----------- Param:" + triggerParam.getExecutorParams());

                if (triggerParam.getExecutorTimeout() > 0) {//7
                    // limit timeout
                    Thread futureThread = null;
                    try {
                        final TriggerParam triggerParamTmp = triggerParam;
                        FutureTask<ReturnT<String>> futureTask = new FutureTask<ReturnT<String>>(new Callable<ReturnT<String>>() {
                            @Override
                            public ReturnT<String> call() throws Exception {
                                return handler.execute(triggerParamTmp.getExecutorParams());
                            }
                        });
                        futureThread = new Thread(futureTask);
                        futureThread.start();

                        executeResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS);
                    } catch (TimeoutException e) {

                        XxlJobLogger.log("<br>----------- xxl-job job execute timeout");
                        XxlJobLogger.log(e);

                        executeResult = new ReturnT<String>(IJobHandler.FAIL_TIMEOUT.getCode(), "job execute timeout ");
                    } finally {
                        futureThread.interrupt();
                    }
                } else {//8
                    executeResult = handler.execute(triggerParam.getExecutorParams());
                }

                if (executeResult == null) {
                    executeResult = IJobHandler.FAIL;
                }
                XxlJobLogger.log("<br>----------- xxl-job job execute end(finish) -----------<br>----------- ReturnT:" + executeResult);

            } else {
                if (idleTimes > 30) {
                    XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit.");
                }
            }
        } catch (Throwable e) {
            if (toStop) {
                XxlJobLogger.log("<br>----------- JobThread toStop, stopReason:" + stopReason);
            }

            StringWriter stringWriter = new StringWriter();
            e.printStackTrace(new PrintWriter(stringWriter));
            String errorMsg = stringWriter.toString();
            executeResult = new ReturnT<String>(ReturnT.FAIL_CODE, errorMsg);

            XxlJobLogger.log("<br>----------- JobThread Exception:" + errorMsg + "<br>----------- xxl-job job execute end(error) -----------");
        } finally {//9
            if(triggerParam != null) {
                // callback handler info
                if (!toStop) {
                    // commonm
                    TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTim(), executeResult));
                } else {
                    // is killed
                    ReturnT<String> stopResult = new ReturnT<String>(ReturnT.FAIL_CODE, stopReason + " [job running,killed]");
                    TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTim(), stopResult));
                }
            }
        }
    }

    while(triggerQueue !=null && triggerQueue.size()>0){//10
        TriggerParam triggerParam = triggerQueue.poll();
        if (triggerParam!=null) {
            // is killed
            ReturnT<String> stopResult = new ReturnT<String>(ReturnT.FAIL_CODE, stopReason + " [job not executed, in the job queue, killed.]");
            TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTim(), stopResult));
        }
    }

    try {//11
        handler.destroy();
    } catch (Throwable e) {
        logger.error(e.getMessage(), e);
    }

    logger.info(">>>>>>>>>>> xxl-job JobThread stoped, hashCode:{}", Thread.currentThread());
}

七 执行器端xxlrpc服务

7.1 空状态检测

  • 检测任务线程是否有待执行任务
public ReturnT<String> idleBeat(int jobId) {

    // isRunningOrHasQueue
    boolean isRunningOrHasQueue = false;
    JobThread jobThread = XxlJobExecutor.loadJobThread(jobId);
    if (jobThread != null && jobThread.isRunningOrHasQueue()) {
        isRunningOrHasQueue = true;
    }

    if (isRunningOrHasQueue) {
        return new ReturnT<String>(ReturnT.FAIL_CODE, "job thread is running or has trigger queue.");
    }
    return ReturnT.SUCCESS;
}

7.2 停止任务

  • 获取目标任务线程
public ReturnT<String> kill(int jobId) {
    // kill handlerThread, and create new one
    JobThread jobThread = XxlJobExecutor.loadJobThread(jobId);
    if (jobThread != null) {
        XxlJobExecutor.removeJobThread(jobId, "scheduling center kill job.");
        return ReturnT.SUCCESS;
    }

    return new ReturnT<String>(ReturnT.SUCCESS_CODE, "job thread aleady killed.");
}
  • 设置线程停止标记,interrupt通知线程中断
public static void removeJobThread(int jobId, String removeOldReason){
    JobThread oldJobThread = jobThreadRepository.remove(jobId);
    if (oldJobThread != null) {
        oldJobThread.toStop(removeOldReason);
        oldJobThread.interrupt();
    }
}

7.3 任务日志查询

  • 任务id获取任务日志路径,根据行号读取任务日志内容
public ReturnT<LogResult> log(long logDateTim, int logId, int fromLineNum) {
    // log filename: logPath/yyyy-MM-dd/9999.log
    String logFileName = XxlJobFileAppender.makeLogFileName(new Date(logDateTim), logId);

    LogResult logResult = XxlJobFileAppender.readLog(logFileName, fromLineNum);
    return new ReturnT<LogResult>(logResult);
}

7.4 任务触发

  • 根据任务类型,获取任务对应的执行handler。
    java bean类型,则是用户自行定义的任务执行handler
    GLUE_GROOVY类型,则使用GlueJobHandler
    脚本类型,则ScriptJobHandler
  • 若任务id已存在对应任务线程,则根据并发策略处理,
SERIAL_EXECUTION("Serial execution"),//串行执行任务
DISCARD_LATER("Discard Later"),//丢弃后续任务
COVER_EARLY("Cover Early");//后续任务覆盖之前任务
  • 任务id不存在对应任务线程,则初始化任务线程。
    此时若已有任务线程则通知停止,表示任务并发策略使用的是COVER_EARLY覆盖策略
public static JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason){
    JobThread newJobThread = new JobThread(jobId, handler);
    newJobThread.start();
    logger.info(">>>>>>>>>>> xxl-job regist JobThread success, jobId:{}, handler:{}", new Object[]{jobId, handler});

    JobThread oldJobThread = jobThreadRepository.put(jobId, newJobThread);  // putIfAbsent | oh my god, map's put method return the old value!!!
    if (oldJobThread != null) {
        oldJobThread.toStop(removeOldReason);
        oldJobThread.interrupt();
    }

    return newJobThread;
}
  • blockqueue通知任务线程执行任务
ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);

7.4.1 ScriptJobHandler

  • 使用脚本内容创建脚本文件
public ReturnT<String> execute(String param) throws Exception {

    if (!glueType.isScript()) {
        return new ReturnT<String>(IJobHandler.FAIL.getCode(), "glueType["+ glueType +"] invalid.");
    }

    // cmd
    String cmd = glueType.getCmd();

    // make script file
    String scriptFileName = XxlJobFileAppender.getGlueSrcPath()
            .concat(File.separator)
            .concat(String.valueOf(jobId))
            .concat("_")
            .concat(String.valueOf(glueUpdatetime))
            .concat(glueType.getSuffix());
    File scriptFile = new File(scriptFileName);
    if (!scriptFile.exists()) {
        ScriptUtil.markScriptFile(scriptFileName, gluesource);
    }

    // log file
    String logFileName = XxlJobFileAppender.contextHolder.get();

    // script params:0=param、1=分片序号、2=分片总数
    ShardingUtil.ShardingVO shardingVO = ShardingUtil.getShardingVo();
    String[] scriptParams = new String[3];
    scriptParams[0] = param;
    scriptParams[1] = String.valueOf(shardingVO.getIndex());
    scriptParams[2] = String.valueOf(shardingVO.getTotal());

    // invoke
    XxlJobLogger.log("----------- script file:"+ scriptFileName +" -----------");
    int exitValue = ScriptUtil.execToFile(cmd, scriptFileName, logFileName, scriptParams);

    if (exitValue == 0) {
        return IJobHandler.SUCCESS;
    } else {
        return new ReturnT<String>(IJobHandler.FAIL.getCode(), "script exit value("+exitValue+") is failed");
    }

}
  • DefaultExecutor调用脚本命令执行脚本文件,接收脚本输出日志保存到任务日志文件中
public static int execToFile(String command, String scriptFile, String logFile, String... params) throws IOException {
    // 标准输出:print (null if watchdog timeout)
    // 错误输出:logging + 异常 (still exists if watchdog timeout)
    // 标准输入

    FileOutputStream fileOutputStream = null;   //
    try {
        fileOutputStream = new FileOutputStream(logFile, true);
        PumpStreamHandler streamHandler = new PumpStreamHandler(fileOutputStream, fileOutputStream, null);

        // command
        CommandLine commandline = new CommandLine(command);
        commandline.addArgument(scriptFile);
        if (params!=null && params.length>0) {
            commandline.addArguments(params);
        }

        // exec
        DefaultExecutor exec = new DefaultExecutor();
        exec.setExitValues(null);
        exec.setStreamHandler(streamHandler);
        int exitValue = exec.execute(commandline);  // exit code: 0=success, 1=error
        return exitValue;
    } catch (Exception e) {
        XxlJobLogger.log(e);
        return -1;
    } finally {
        if (fileOutputStream != null) {
            try {
                fileOutputStream.close();
            } catch (IOException e) {
                XxlJobLogger.log(e);
            }

        }
    }
}

7.4.2 GlueJobHandler

  • 初始化
if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) {

    // valid old jobThread
    if (jobThread != null &&
            !(jobThread.getHandler() instanceof GlueJobHandler
                && ((GlueJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {
        // change handler or gluesource updated, need kill old thread
        removeOldReason = "change job source or glue type, and terminate the old job thread.";

        jobThread = null;
        jobHandler = null;
    }

    // valid handler
    if (jobHandler == null) {
        try {
            IJobHandler originJobHandler = GlueFactory.getInstance().loadNewInstance(triggerParam.getGlueSource());
            jobHandler = new GlueJobHandler(originJobHandler, triggerParam.getGlueUpdatetime());
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
            return new ReturnT<String>(ReturnT.FAIL_CODE, e.getMessage());
        }
    }
}
  • 执行
public ReturnT<String> execute(String param) throws Exception {
    XxlJobLogger.log("----------- glue.version:"+ glueUpdatetime +" -----------");
    return jobHandler.execute(param);
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 211,948评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,371评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,490评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,521评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,627评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,842评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,997评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,741评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,203评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,534评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,673评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,339评论 4 330
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,955评论 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,770评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,000评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,394评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,562评论 2 349

推荐阅读更多精彩内容

  • 在一个方法内部定义的变量都存储在栈中,当这个函数运行结束后,其对应的栈就会被回收,此时,在其方法体中定义的变量将不...
    Y了个J阅读 4,413评论 1 14
  • ORA-00001: 违反唯一约束条件 (.) 错误说明:当在唯一索引所对应的列上键入重复值时,会触发此异常。 O...
    我想起个好名字阅读 5,256评论 0 9
  • 一 基础篇 1.1 Java基础 面向对象的特征抽象:将一类对象的共同特征总结出来构建类的过程。继承:对已有类的一...
    essential_note阅读 689评论 0 0
  • 这是我跑步的第八天。原本打算跑一天歇一天,这样对身体伤害更小一些。但是运动就是让人上瘾的东西,加上本身身体没有什么...
    森晴小语阅读 922评论 4 52
  • 姓名:楼灵芝 单位:杭州熙林服饰 【日精进打卡第155天】 【知~学习】 《六项精进》背诵1遍,共601遍; 《大...
    心镜_8ef4阅读 100评论 0 0