go语言API完成复杂任务流程工作实践

背景介绍

通过API执行一个复杂的任务(包含多个子任务)时,做成异步接口是一个比较好的选择。但是这样做有几个问题就要解决:

  1. 任务执行到一半,宿主机宕机或重启,任务如何保证可靠性(任务继续执行完成)。
  2. 客户端通过API可以随时查看任务的执行进展(子任务完成时刷新状态)。
  3. 执行子任务后失败后要支持对前面的子任务进行回滚(FailRollBack), 回滚的流程按照子任务的逆向流程执行。
    举例:任务流程:子任务1 -> 子任务2 -> 子任务3 -> 子任务4,在子任务3失败时,按照 子任务3 -> 子任务2 -> 子任务1顺序进行回滚(回滚流程支持灵活配置)
  4. 子任务执行成功要支持执行SuccessCallBack
    举例: 执行子任务成功后,刷新任务的执行的进展为当前子任务执行成功,可以放在SuccessCallBack执行
  5. 子任务执行失败要支持执行FailCallBack
    举例: 执行子任务失败时,需要刷新任务状态为failed,可以放在FailCallBack执行

解决方案

1. API服务通过主备的方式部署(可靠性)。

go使用Gin框架,能够支持几千的并发访问。go协程可以支持本实例并发任务处理。要是任务量过大,需要多实例并行处理,建议使用基于事件的主从模式(本文不做讨论)。
主备模式的实现参考k8s的控制器互斥锁机制(宿主机宕机或重启时,通过K8s基础设施完成备实例切换成主实例)
代码:

import
(
...
"k8s.io/client-go/tools/leaderelection/resourcelock"
...
)

func Run(ctx context.Context, config *appconfig.Config, stopCh <-chan struct{}) error {
    // Start watch config file change.
    if config.ComponentConfig.EnableWatch {
        go config.ComponentConfig.Watch(ctx)
    }

    ctrl := controller.New(config, stopCh)

    name, err := os.Hostname()
    if err != nil {
        panic("get hostname failed")
    }
    // 抢占主实例
    rl, err := resourcelock.NewFromKubeconfig(
        resourcelock.EndpointsLeasesResourceLock,
        config.KubeConfig.Leaderelection.Namespace,
        config.KubeConfig.Leaderelection.Name,
        resourcelock.ResourceLockConfig{
            Identity: name + "_" + string(uuid.NewUUID()),
        },
        config.KubeConfig.KubeRestConfig,
        15*time.Second)

    if err != nil {
        return err
    }

    // start the leader election code loop
    leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
        Lock: rl,
        // IMPORTANT: you MUST ensure that any code you have that
        // is protected by the lease must terminate **before**
        // you call cancel. Otherwise, you could have a background
        // loop still running and another process could
        // get elected before your background loop finished, violating
        // the stated goal of the lease.
        ReleaseOnCancel: true,
        LeaseDuration:   60 * time.Second,
        RenewDeadline:   15 * time.Second,
        RetryPeriod:     5 * time.Second,
        Callbacks: leaderelection.LeaderCallbacks{
            OnStartedLeading: func(ctx context.Context) {
                // we're notified when we start - this is where you would
                // usually put your code
                if err := ctrl.Run(ctx.Done()); err != nil {
                    panic(err)
                }
            },
            OnStoppedLeading: func() {
                // we can do cleanup here
                log.Warn("leaderelection lost")
                os.Exit(1)
            },
        },
    })

    log.Info("Stop Server With Graceful.")

    return nil
}

2. 工作流框架(子任务拆分)

有一些开源的工作流框架可以考虑,如:Conductor,但是总体使用下来比较重,性能有影响,在处理并发任务时会有故障,问题也不太好排查。如下是我开发的直接嵌入的工作流处理框架:通过编排方式对子任务主流程,子任务的失败处理FailCallBack,子任务的状态刷新SuccessCallBack,子任务的重试,子任务的回滚RollBack进行编排,可以根据业务场景灵活配置。工作流框架代码可以直接引入到业务代码中,性能较高,出问题也比较容易排查,已验证可以有效解决问题。

代码
(如下连接上下文的Metadata(子任务上下文状态同步),还没有设计为通用型的结构,各业务场景可以设计自己的模型结构)

type AppWorkFlow struct {
    Metadata *v1.WorkflowAppParams
    Works    []AppWork
}

type AppWork struct {
    WorkName        string
    Work            WorkFlowFn
    RollBack        WorkFlowFn
    SuccessCallBack WorkFlowFn
    FailCallBack    WorkFlowFn
}

type WorkFlowFn func(*v1.WorkflowAppParams) error

func (workFlow *AppWorkFlow) Start() {
    rollBackWorks := []WorkFlowFn{}
    for _, work := range workFlow.Works {
        if work.RollBack != nil {
            rollBackWorks = append(rollBackWorks, work.RollBack)
        }
        metadata := workFlow.Metadata
        if err := work.Work(metadata); err != nil {
            metadata.Logger.Infof("app %s process %s do err,%+v", workFlow.Metadata.AppRecord.Name, work.WorkName, err)
            // 更新失败状态
            if err := work.FailCallBack(metadata); err != nil {
                metadata.Logger.Warnf("app %s process %s FailCallBack err,%v", workFlow.Metadata.AppRecord.Name, work.WorkName, err)
                return
            }
            for _, rollBackWork := range rollBackWorks {
                if err := rollBackWork(workFlow.Metadata); err != nil {
                    metadata.Logger.Warnf("rollback %s process %s do err,%v", workFlow.Metadata.AppRecord.Name, work.WorkName, err)
                }
            }
            // 回滚完成即任务退出
            return
        }
    }
}

在一些错误场景,子任务需要重试,工作流框架提供重试子任务的功能可以在编排时直接配置

func WorkforTimeout(workName string, period int64, timeOut int64, work WorkFlowFn) WorkFlowFn {
    return func(app *v1.WorkflowAppParams) error {
        for timeOut > 0 {
            if err := work(app); err != nil {
                if !errors.Is(err, ErrNeedRetry) {
                    log.Errorf("%s err, %v, break work", workName, err)
                    return err
                }
                log.Infof("%s err, %v, continue work", workName, err)
                time.Sleep(time.Duration(period) * time.Second)
                timeOut--
                continue
            }
            return nil
        }
        app.AppRecord.Reason = fmt.Sprintf("%s until timeout: [%s]", workName, app.AppRecord.Reason)
        return errors.Errorf("%s until timeout", workName)
    }
}

var ErrNeedRetry = errors.New("need retry")

业务代码

每个子任务流程中只需更新上下文模型的状态,可以通过配置FailCallBack更新失败的状态,也可以也通过配置SuccsessCallBack更新任务的执行进展,若是需要在后面的子任务失败时被回滚(一些资源释放的场景)则可以配置RollBack(SuccsessCallBack和RollBack下面的流程没有举例)

const (
    WORK_CREATE_CREATEINSTANCE       = "create app instance"
    WORK_CREATE_UPDATECREATESTATUS   = "update creating status"
    WORK_CREATE_CHECKAPPCREATESTATUS = "check app create status"
    WORK_CREATE_UPDATERUNNIGSTATUS   = "update running status"
)

func (ac *AppController) CreateApp(params *v1.WorkflowAppParams) (*appv1.CreateAppResponse, error) {
    createAppWorkFlow := AppWorkFlow{
        Metadata: params,
        Works: []AppWork{
            {
                WorkName:     WORK_CREATE_CREATEINSTANCE,
                Work:         ac.createAppInstance,
                FailCallBack: ac.updateAppStatus,
            },
            {
                WorkName: WORK_CREATE_UPDATECREATESTATUS,
                Work:     ac.updateAppStatus,
            },
            {
                WorkName:     WORK_CREATE_CHECKAPPCREATESTATUS,
                Work:         WorkforTimeout(WORK_CREATE_CHECKAPPCREATESTATUS, 3, 5*60, ac.checkAppStatus), 
                FailCallBack: ac.updateAppStatus,
            },
            {
                WorkName: WORK_CREATE_UPDATERUNNIGSTATUS,
                Work:     ac.updateAppStatus,
            },
        },
    }
    go createAppWorkFlow.Start()
    return &appv1.CreateAppResponse{
        Status: 200,
        Data:   params.AppRecord.Id,
    }, nil
}

如下流程即执行子任务的过程,错误时返回err即可,会在defer中通过metadata刷新状态,工作流的主流程检查到子任务发生错误会执行FailCallBack或者RollBack刷新状态或回滚

func (ac *AppController) createAppInstance(params *v1.WorkflowAppParams) (err error) {
    defer func() {
        if err != nil {
            params.AppRecord.Reason = err.Error()
            params.AppRecord.Status = v1.APP_STATUS_FAILED
        }
    }()
        // 如下流程即执行子任务的过程
    logHeader := deepcopyMap(params.HttpHeader)
    logHeader["X-Access-Token"] = utils.MaskToken(logHeader["X-Access-Token"])
    var reqBody []byte
    var logBody []byte
    if params.AppRecord.ManageBy == "vcluster" {
        reqBody, err = json.Marshal(params.CreateAppReq)
        if err != nil {
            return errors.Wrapf(err, "[Failed to marshal request data, params: %v]", params.CreateAppReq)
        }
        logBody = reqBody
    } else {
        formatReqBody := params.AppRequestData
        formatReqBody.Spec.Kubeconfig = utils.MaskToken(formatReqBody.Spec.Kubeconfig)
        logBody, _ = json.Marshal(formatReqBody)
        reqBody, err = json.Marshal(params.AppRequestData)
        if err != nil {
            return errors.Wrapf(err, "[Failed to marshal request data, params: %v]", params.AppRequestData)
        }
    }
    httpLogPrint := v1.HttpLogPrint{
        Url:    params.AppConfig.Url,
        Header: logHeader,
        Method: "POST",
        Body:   string(logBody),
    }
    params.Logger.Infof("http create app instance type %s, url %s", params.AppRecord.ManageBy, params.AppConfig.Url)
    resp, err := adapter.AppPostRestRequest(params.HttpHeader, params.AppConfig.Url, reqBody, httpLogPrint)
    if err != nil {
        return errors.Wrapf(err, "[Failed to create app: %s, err, %v]", params.AppRecord.Name, err)
    }
    if resp.Code != 0 {
        return errors.Errorf("create app: %s Http Post err, errCode: %d, msg: [%s]", params.AppRecord.Name, resp.Code, resp.Msg)
    }
    if resp.Data.AppId != "" {
        // 下层应用选择用自己的appId
        params.AppRecord.AppId = resp.Data.AppId
    }
    params.AppRecord.Status = v1.APP_STATUS_CREATING
    return nil
}

本示例中刷新状态的函数作为FailCallBack,也作为子任务在上一个子任务成功时刷新状态。
总之子任务可以灵活配置,子任务之间通过metadata作为上下文进行状态同步。

func (ac *AppController) updateAppStatus(params *v1.WorkflowAppParams) error {
    if params.AppRecord.Reason != "" {
        params.AppRecord.Reason = reasonDetail(params.AppRecord.Reason)
    }
    if err := ac.controller.AppRepo().Update(params.AppRecord); err != nil {
        return errors.Errorf("update app %s status %s err, %v", params.AppRecord.Name, params.AppRecord.Status, err)
    }
    return nil
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容