聊聊powerjob的maxResultLength

本文主要研究一下powerjob的maxResultLength

maxResultLength

powerjob-worker/src/main/java/tech/powerjob/worker/common/PowerJobWorkerConfig.java

@Getter
@Setter
public class PowerJobWorkerConfig {

    //......

    /**
     * Max length of response result. Result that is longer than the value will be truncated.
     * {@link ProcessResult} max length for #msg
     */
    private int maxResultLength = 8096;

    //......    
}

maxResultLength默认为8096,它定义了返回的result的最大长度

innerRun

powerjob-worker/src/main/java/tech/powerjob/worker/core/processor/runnable/HeavyProcessorRunnable.java

    public void innerRun() throws InterruptedException {

        final BasicProcessor processor = processorBean.getProcessor();

        String taskId = task.getTaskId();
        Long instanceId = task.getInstanceId();

        log.debug("[ProcessorRunnable-{}] start to run task(taskId={}&taskName={})", instanceId, taskId, task.getTaskName());
        ThreadLocalStore.setTask(task);
        ThreadLocalStore.setRuntimeMeta(workerRuntime);

        // 0. 构造任务上下文
        WorkflowContext workflowContext = constructWorkflowContext();
        TaskContext taskContext = constructTaskContext();
        taskContext.setWorkflowContext(workflowContext);
        // 1. 上报执行信息
        reportStatus(TaskStatus.WORKER_PROCESSING, null, null, null);

        ProcessResult processResult;
        ExecuteType executeType = ExecuteType.valueOf(instanceInfo.getExecuteType());

        // 2. 根任务 & 广播执行 特殊处理
        if (TaskConstant.ROOT_TASK_NAME.equals(task.getTaskName()) && executeType == ExecuteType.BROADCAST) {
            // 广播执行:先选本机执行 preProcess,完成后 TaskTracker 再为所有 Worker 生成子 Task
            handleBroadcastRootTask(instanceId, taskContext);
            return;
        }

        // 3. 最终任务特殊处理(一定和 TaskTracker 处于相同的机器)
        if (TaskConstant.LAST_TASK_NAME.equals(task.getTaskName())) {
            handleLastTask(taskId, instanceId, taskContext, executeType);
            return;
        }

        // 4. 正式提交运行
        try {
            processResult = processor.process(taskContext);
            if (processResult == null) {
                processResult = new ProcessResult(false, "ProcessResult can't be null");
            }
        } catch (Throwable e) {
            log.warn("[ProcessorRunnable-{}] task(id={},name={}) process failed.", instanceId, taskContext.getTaskId(), taskContext.getTaskName(), e);
            processResult = new ProcessResult(false, e.toString());
        }
        reportStatus(processResult.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED, suit(processResult.getMsg()), null, workflowContext.getAppendedContextData());
    }

innerRun方法最后一步执行reportStatus,它会对processResult.getMsg()进行suit

suit

    private String suit(String result) {

        if (StringUtils.isEmpty(result)) {
            return "";
        }
        final int maxLength = workerRuntime.getWorkerConfig().getMaxResultLength();
        if (result.length() <= maxLength) {
            return result;
        }
        log.warn("[ProcessorRunnable-{}] task(taskId={})'s result is too large({}>{}), a part will be discarded.",
                task.getInstanceId(), task.getTaskId(), result.length(), maxLength);
        return result.substring(0, maxLength).concat("...");
    }

suit方法就是根据workerRuntime.getWorkerConfig().getMaxResultLength()来判断,超过maxLength的则取maxLength长度,后面加...

onReceiveProcessorReportTaskStatusReq

powerjob-worker/src/main/java/tech/powerjob/worker/actors/TaskTrackerActor.java

    @Handler(path = WTT_HANDLER_REPORT_TASK_STATUS)
    public AskResponse onReceiveProcessorReportTaskStatusReq(ProcessorReportTaskStatusReq req) {

        int taskStatus = req.getStatus();
        // 只有重量级任务才会有两级任务状态上报的机制
        HeavyTaskTracker taskTracker = HeavyTaskTrackerManager.getTaskTracker(req.getInstanceId());

        // 手动停止 TaskTracker 的情况下会出现这种情况
        if (taskTracker == null) {
            log.warn("[TaskTrackerActor] receive ProcessorReportTaskStatusReq({}) but system can't find TaskTracker.", req);
            return null;
        }

        if (ProcessorReportTaskStatusReq.BROADCAST.equals(req.getCmd())) {
            taskTracker.broadcast(taskStatus == TaskStatus.WORKER_PROCESS_SUCCESS.getValue(), req.getSubInstanceId(), req.getTaskId(), req.getResult());
        }

        taskTracker.updateTaskStatus(req.getSubInstanceId(), req.getTaskId(), taskStatus, req.getReportTime(), req.getResult());

        // 更新工作流上下文
        taskTracker.updateAppendedWfContext(req.getAppendedWfContext());

        // 结束状态需要回复接受成功
        if (TaskStatus.FINISHED_STATUS.contains(taskStatus)) {
            return AskResponse.succeed(null);
        }

        return null;
    }

onReceiveProcessorReportTaskStatusReq方法接收到ProcessorReportTaskStatusReq则执行taskTracker.updateTaskStatus

updateTaskStatus

powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java

    public void updateTaskStatus(Long subInstanceId, String taskId, int newStatus, long reportTime, @Nullable String result) {

        if (finished.get()) {
            return;
        }
        TaskStatus nTaskStatus = TaskStatus.of(newStatus);

        int lockId = taskId.hashCode();
        try {

            // 阻塞获取锁
            segmentLock.lockInterruptible(lockId);
            TaskBriefInfo taskBriefInfo = taskId2BriefInfo.getIfPresent(taskId);

            // 缓存中不存在,从数据库查
            if (taskBriefInfo == null) {
                Optional<TaskDO> taskOpt = taskPersistenceService.getTask(instanceId, taskId);
                if (taskOpt.isPresent()) {
                    TaskDO taskDO = taskOpt.get();
                    taskBriefInfo = new TaskBriefInfo(taskId, TaskStatus.of(taskDO.getStatus()), taskDO.getLastReportTime());
                } else {
                    // 理论上不存在这种情况,除非数据库异常
                    log.error("[TaskTracker-{}-{}] can't find task by taskId={}.", instanceId, subInstanceId, taskId);
                    taskBriefInfo = new TaskBriefInfo(taskId, TaskStatus.WAITING_DISPATCH, -1L);
                }
                // 写入缓存
                taskId2BriefInfo.put(taskId, taskBriefInfo);
            }

            // 过滤过期的请求(潜在的集群时间一致性需求,重试跨 Worker 时,时间不一致可能导致问题)
            if (taskBriefInfo.getLastReportTime() > reportTime) {
                log.warn("[TaskTracker-{}-{}] receive expired(last {} > current {}) task status report(taskId={},newStatus={}), TaskTracker will drop this report.",
                        instanceId, subInstanceId, taskBriefInfo.getLastReportTime(), reportTime, taskId, newStatus);
                return;
            }
            // 检查状态转移是否合法,fix issue 404
            if (nTaskStatus.getValue() < taskBriefInfo.getStatus().getValue()) {
                log.warn("[TaskTracker-{}-{}] receive invalid task status report(taskId={},currentStatus={},newStatus={}), TaskTracker will drop this report.",
                        instanceId, subInstanceId, taskId, taskBriefInfo.getStatus().getValue(), newStatus);
                return;
            }

            // 此时本次请求已经有效,先更新相关信息
            taskBriefInfo.setLastReportTime(reportTime);
            taskBriefInfo.setStatus(nTaskStatus);

            // 处理失败的情况
            int configTaskRetryNum = instanceInfo.getTaskRetryNum();
            if (nTaskStatus == TaskStatus.WORKER_PROCESS_FAILED && configTaskRetryNum >= 1) {

                // 失败不是主要的情况,多查一次数据库也问题不大(况且前面有缓存顶着,大部分情况之前不会去查DB)
                Optional<TaskDO> taskOpt = taskPersistenceService.getTask(instanceId, taskId);
                // 查询DB再失败的话,就不重试了...
                if (taskOpt.isPresent()) {
                    int failedCnt = taskOpt.get().getFailedCnt();
                    if (failedCnt < configTaskRetryNum) {

                        TaskDO updateEntity = new TaskDO();
                        updateEntity.setFailedCnt(failedCnt + 1);

                        /*
                        地址规则:
                        1. 当前存储的地址为任务派发的目的地(ProcessorTracker地址)
                        2. 根任务、最终任务必须由TaskTracker所在机器执行(如果是根任务和最终任务,不应当修改地址)
                        3. 广播任务每台机器都需要执行,因此不应该重新分配worker(广播任务不应当修改地址)
                         */
                        String taskName = taskOpt.get().getTaskName();
                        ExecuteType executeType = ExecuteType.valueOf(instanceInfo.getExecuteType());
                        if (!taskName.equals(TaskConstant.ROOT_TASK_NAME) && !taskName.equals(TaskConstant.LAST_TASK_NAME) && executeType != ExecuteType.BROADCAST) {
                            updateEntity.setAddress(RemoteConstant.EMPTY_ADDRESS);
                        }

                        updateEntity.setStatus(TaskStatus.WAITING_DISPATCH.getValue());
                        updateEntity.setLastReportTime(reportTime);

                        boolean retryTask = taskPersistenceService.updateTask(instanceId, taskId, updateEntity);
                        if (retryTask) {
                            log.info("[TaskTracker-{}-{}] task(taskId={}) process failed, TaskTracker will have a retry.", instanceId, subInstanceId, taskId);
                            return;
                        }
                    }
                }
            }

            // 更新状态(失败重试写入DB失败的,也就不重试了...谁让你那么倒霉呢...)
            result = result == null ? "" : result;
            boolean updateResult = taskPersistenceService.updateTaskStatus(instanceId, taskId, newStatus, reportTime, result);

            if (!updateResult) {
                log.warn("[TaskTracker-{}-{}] update task status failed, this task(taskId={}) may be processed repeatedly!", instanceId, subInstanceId, taskId);
            }

        } catch (InterruptedException ignore) {
            // ignore
        } catch (Exception e) {
            log.warn("[TaskTracker-{}-{}] update task status failed.", instanceId, subInstanceId, e);
        } finally {
            segmentLock.unlock(lockId);
        }
    }

updateTaskStatus则执行taskPersistenceService.updateTaskStatus

TaskPersistenceService.updateTaskStatus

powerjob-worker/src/main/java/tech/powerjob/worker/persistence/TaskPersistenceService.java

    public boolean updateTaskStatus(Long instanceId, String taskId, int status, long lastReportTime, String result) {
        try {
            return execute(() -> taskDAO.updateTaskStatus(instanceId, taskId, status, lastReportTime, result));
        }catch (Exception e) {
            log.error("[TaskPersistenceService] updateTaskStatus failed.", e);
        }
        return false;
    }

执行的是taskDAO.updateTaskStatus

TaskDAOImpl.updateTaskStatus

powerjob-worker/src/main/java/tech/powerjob/worker/persistence/TaskDAOImpl.java

    public boolean updateTaskStatus(Long instanceId, String taskId, int status, long lastReportTime, String result) throws SQLException {
        String sql = "update task_info set status = ?, last_report_time = ?, result = ?, last_modified_time = ? where instance_id = ? and task_id = ?";
        try (Connection conn = connectionFactory.getConnection(); PreparedStatement ps = conn.prepareStatement(sql)) {

            ps.setInt(1, status);
            ps.setLong(2, lastReportTime);
            ps.setString(3, result);
            ps.setLong(4, lastReportTime);
            ps.setLong(5, instanceId);
            ps.setString(6, taskId);
            ps.executeUpdate();
            return true;
        }
    }

updateTaskStatus最后更新到task_info表

task_info

    public void initTable() throws Exception {

        String delTableSQL = "drop table if exists task_info";
        // 感谢 Gitee 用户 @Linfly 反馈的 BUG
        // bigint(20) 与 Java Long 取值范围完全一致
        String createTableSQL = "create table task_info (task_id varchar(255), instance_id bigint, sub_instance_id bigint, task_name varchar(255), task_content blob, address varchar(255), status int, result text, failed_cnt int, created_time bigint, last_modified_time bigint, last_report_time bigint, constraint pkey unique (instance_id, task_id))";

        try (Connection conn = connectionFactory.getConnection(); Statement stat = conn.createStatement()) {
            stat.execute(delTableSQL);
            stat.execute(createTableSQL);
        }
    }

task_info定义了result字段,其类型为text

小结

  • PowerJobWorkerConfig的maxResultLength默认为8096,它定义了返回的result的最大长度
  • HeavyProcessorRunnable的innerRun方法最后一步执行reportStatus,它会对processResult.getMsg()进行suit,suit方法就是根据workerRuntime.getWorkerConfig().getMaxResultLength()来判断,超过maxLength的则取maxLength长度,后面加...
  • onReceiveProcessorReportTaskStatusReq方法接收到ProcessorReportTaskStatusReq则执行taskTracker.updateTaskStatus,最后更新到task_info表,该表定义了result字段,其类型为text
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,088评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,715评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,361评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,099评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 60,987评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,063评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,486评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,175评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,440评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,518评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,305评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,190评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,550评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,880评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,152评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,451评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,637评论 2 335

推荐阅读更多精彩内容