背景介绍
通过API执行一个复杂的任务(包含多个子任务)时,做成异步接口是一个比较好的选择。但是这样做有几个问题就要解决:
- 任务执行到一半,宿主机宕机或重启,任务如何保证可靠性(任务继续执行完成)。
- 客户端通过API可以随时查看任务的执行进展(子任务完成时刷新状态)。
- 执行子任务后失败后要支持对前面的子任务进行回滚(FailRollBack), 回滚的流程按照子任务的逆向流程执行。
举例:任务流程:子任务1 -> 子任务2 -> 子任务3 -> 子任务4,在子任务3失败时,按照 子任务3 -> 子任务2 -> 子任务1顺序进行回滚(回滚流程支持灵活配置) - 子任务执行成功要支持执行SuccessCallBack
举例: 执行子任务成功后,刷新任务的执行的进展为当前子任务执行成功,可以放在SuccessCallBack执行 - 子任务执行失败要支持执行FailCallBack
举例: 执行子任务失败时,需要刷新任务状态为failed,可以放在FailCallBack执行
解决方案
1. API服务通过主备的方式部署(可靠性)。
go使用Gin框架,能够支持几千的并发访问。go协程可以支持本实例并发任务处理。要是任务量过大,需要多实例并行处理,建议使用基于事件的主从模式(本文不做讨论)。
主备模式的实现参考k8s的控制器互斥锁机制(宿主机宕机或重启时,通过K8s基础设施完成备实例切换成主实例)
代码:
import
(
...
"k8s.io/client-go/tools/leaderelection/resourcelock"
...
)
func Run(ctx context.Context, config *appconfig.Config, stopCh <-chan struct{}) error {
// Start watch config file change.
if config.ComponentConfig.EnableWatch {
go config.ComponentConfig.Watch(ctx)
}
ctrl := controller.New(config, stopCh)
name, err := os.Hostname()
if err != nil {
panic("get hostname failed")
}
// 抢占主实例
rl, err := resourcelock.NewFromKubeconfig(
resourcelock.EndpointsLeasesResourceLock,
config.KubeConfig.Leaderelection.Namespace,
config.KubeConfig.Leaderelection.Name,
resourcelock.ResourceLockConfig{
Identity: name + "_" + string(uuid.NewUUID()),
},
config.KubeConfig.KubeRestConfig,
15*time.Second)
if err != nil {
return err
}
// start the leader election code loop
leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
Lock: rl,
// IMPORTANT: you MUST ensure that any code you have that
// is protected by the lease must terminate **before**
// you call cancel. Otherwise, you could have a background
// loop still running and another process could
// get elected before your background loop finished, violating
// the stated goal of the lease.
ReleaseOnCancel: true,
LeaseDuration: 60 * time.Second,
RenewDeadline: 15 * time.Second,
RetryPeriod: 5 * time.Second,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
// we're notified when we start - this is where you would
// usually put your code
if err := ctrl.Run(ctx.Done()); err != nil {
panic(err)
}
},
OnStoppedLeading: func() {
// we can do cleanup here
log.Warn("leaderelection lost")
os.Exit(1)
},
},
})
log.Info("Stop Server With Graceful.")
return nil
}
2. 工作流框架(子任务拆分)
有一些开源的工作流框架可以考虑,如:Conductor,但是总体使用下来比较重,性能有影响,在处理并发任务时会有故障,问题也不太好排查。如下是我开发的直接嵌入的工作流处理框架:通过编排方式对子任务主流程,子任务的失败处理FailCallBack,子任务的状态刷新SuccessCallBack,子任务的重试,子任务的回滚RollBack进行编排,可以根据业务场景灵活配置。工作流框架代码可以直接引入到业务代码中,性能较高,出问题也比较容易排查,已验证可以有效解决问题。
代码
(如下连接上下文的Metadata(子任务上下文状态同步),还没有设计为通用型的结构,各业务场景可以设计自己的模型结构)
type AppWorkFlow struct {
Metadata *v1.WorkflowAppParams
Works []AppWork
}
type AppWork struct {
WorkName string
Work WorkFlowFn
RollBack WorkFlowFn
SuccessCallBack WorkFlowFn
FailCallBack WorkFlowFn
}
type WorkFlowFn func(*v1.WorkflowAppParams) error
func (workFlow *AppWorkFlow) Start() {
rollBackWorks := []WorkFlowFn{}
for _, work := range workFlow.Works {
if work.RollBack != nil {
rollBackWorks = append(rollBackWorks, work.RollBack)
}
metadata := workFlow.Metadata
if err := work.Work(metadata); err != nil {
metadata.Logger.Infof("app %s process %s do err,%+v", workFlow.Metadata.AppRecord.Name, work.WorkName, err)
// 更新失败状态
if err := work.FailCallBack(metadata); err != nil {
metadata.Logger.Warnf("app %s process %s FailCallBack err,%v", workFlow.Metadata.AppRecord.Name, work.WorkName, err)
return
}
for _, rollBackWork := range rollBackWorks {
if err := rollBackWork(workFlow.Metadata); err != nil {
metadata.Logger.Warnf("rollback %s process %s do err,%v", workFlow.Metadata.AppRecord.Name, work.WorkName, err)
}
}
// 回滚完成即任务退出
return
}
}
}
在一些错误场景,子任务需要重试,工作流框架提供重试子任务的功能可以在编排时直接配置
func WorkforTimeout(workName string, period int64, timeOut int64, work WorkFlowFn) WorkFlowFn {
return func(app *v1.WorkflowAppParams) error {
for timeOut > 0 {
if err := work(app); err != nil {
if !errors.Is(err, ErrNeedRetry) {
log.Errorf("%s err, %v, break work", workName, err)
return err
}
log.Infof("%s err, %v, continue work", workName, err)
time.Sleep(time.Duration(period) * time.Second)
timeOut--
continue
}
return nil
}
app.AppRecord.Reason = fmt.Sprintf("%s until timeout: [%s]", workName, app.AppRecord.Reason)
return errors.Errorf("%s until timeout", workName)
}
}
var ErrNeedRetry = errors.New("need retry")
业务代码
每个子任务流程中只需更新上下文模型的状态,可以通过配置FailCallBack更新失败的状态,也可以也通过配置SuccsessCallBack更新任务的执行进展,若是需要在后面的子任务失败时被回滚(一些资源释放的场景)则可以配置RollBack(SuccsessCallBack和RollBack下面的流程没有举例)
const (
WORK_CREATE_CREATEINSTANCE = "create app instance"
WORK_CREATE_UPDATECREATESTATUS = "update creating status"
WORK_CREATE_CHECKAPPCREATESTATUS = "check app create status"
WORK_CREATE_UPDATERUNNIGSTATUS = "update running status"
)
func (ac *AppController) CreateApp(params *v1.WorkflowAppParams) (*appv1.CreateAppResponse, error) {
createAppWorkFlow := AppWorkFlow{
Metadata: params,
Works: []AppWork{
{
WorkName: WORK_CREATE_CREATEINSTANCE,
Work: ac.createAppInstance,
FailCallBack: ac.updateAppStatus,
},
{
WorkName: WORK_CREATE_UPDATECREATESTATUS,
Work: ac.updateAppStatus,
},
{
WorkName: WORK_CREATE_CHECKAPPCREATESTATUS,
Work: WorkforTimeout(WORK_CREATE_CHECKAPPCREATESTATUS, 3, 5*60, ac.checkAppStatus),
FailCallBack: ac.updateAppStatus,
},
{
WorkName: WORK_CREATE_UPDATERUNNIGSTATUS,
Work: ac.updateAppStatus,
},
},
}
go createAppWorkFlow.Start()
return &appv1.CreateAppResponse{
Status: 200,
Data: params.AppRecord.Id,
}, nil
}
如下流程即执行子任务的过程,错误时返回err即可,会在defer中通过metadata刷新状态,工作流的主流程检查到子任务发生错误会执行FailCallBack或者RollBack刷新状态或回滚
func (ac *AppController) createAppInstance(params *v1.WorkflowAppParams) (err error) {
defer func() {
if err != nil {
params.AppRecord.Reason = err.Error()
params.AppRecord.Status = v1.APP_STATUS_FAILED
}
}()
// 如下流程即执行子任务的过程
logHeader := deepcopyMap(params.HttpHeader)
logHeader["X-Access-Token"] = utils.MaskToken(logHeader["X-Access-Token"])
var reqBody []byte
var logBody []byte
if params.AppRecord.ManageBy == "vcluster" {
reqBody, err = json.Marshal(params.CreateAppReq)
if err != nil {
return errors.Wrapf(err, "[Failed to marshal request data, params: %v]", params.CreateAppReq)
}
logBody = reqBody
} else {
formatReqBody := params.AppRequestData
formatReqBody.Spec.Kubeconfig = utils.MaskToken(formatReqBody.Spec.Kubeconfig)
logBody, _ = json.Marshal(formatReqBody)
reqBody, err = json.Marshal(params.AppRequestData)
if err != nil {
return errors.Wrapf(err, "[Failed to marshal request data, params: %v]", params.AppRequestData)
}
}
httpLogPrint := v1.HttpLogPrint{
Url: params.AppConfig.Url,
Header: logHeader,
Method: "POST",
Body: string(logBody),
}
params.Logger.Infof("http create app instance type %s, url %s", params.AppRecord.ManageBy, params.AppConfig.Url)
resp, err := adapter.AppPostRestRequest(params.HttpHeader, params.AppConfig.Url, reqBody, httpLogPrint)
if err != nil {
return errors.Wrapf(err, "[Failed to create app: %s, err, %v]", params.AppRecord.Name, err)
}
if resp.Code != 0 {
return errors.Errorf("create app: %s Http Post err, errCode: %d, msg: [%s]", params.AppRecord.Name, resp.Code, resp.Msg)
}
if resp.Data.AppId != "" {
// 下层应用选择用自己的appId
params.AppRecord.AppId = resp.Data.AppId
}
params.AppRecord.Status = v1.APP_STATUS_CREATING
return nil
}
本示例中刷新状态的函数作为FailCallBack,也作为子任务在上一个子任务成功时刷新状态。
总之子任务可以灵活配置,子任务之间通过metadata作为上下文进行状态同步。
func (ac *AppController) updateAppStatus(params *v1.WorkflowAppParams) error {
if params.AppRecord.Reason != "" {
params.AppRecord.Reason = reasonDetail(params.AppRecord.Reason)
}
if err := ac.controller.AppRepo().Update(params.AppRecord); err != nil {
return errors.Errorf("update app %s status %s err, %v", params.AppRecord.Name, params.AppRecord.Status, err)
}
return nil
}