# Openkruise/rollouts 源码解读

Openkruise/rollouts 源码解读

最近因为工作需要的原因,无意中接触到了 rollouts 这个开源项目。它是由阿里巴巴开源的一个k8s相关的一个工具。

其主要作用是工作负载(workload) 批量发布与回滚

当前支持的工作负载(workload)有:

  • Deployment
  • CloneSet
  • StatefulSet

同时也支持 Nginx/ALB/Istio 的流量路由控制

常规情况下,我们升级一个工作负载,以Deployment为例,我们更改Deployment 镜像的版本之后,Deployment的控制器会重新去创建新的副本,然后替换掉原来的副本。
这种情况下,如果我们创建出来的副本程序本身是Ready的,但业务不是Ready的,控制器本身仍然会继续将没有更新的副本更新完成,即使我们后续发现了问题,将其回滚,那它的影响面也会非常大。
所以我们需要一定的反悔时间,留给工作人员去观察业务是否正常,正常之后,再继续灰度下去,而不是ALL IN,然后ALL GET OUT。而 rollouts 正是帮我们做这个事情的。

简单使用

参考 基础使用文档

源码解读

CRD设计

Rollout

WorkloadRef 定义了我们要去监听的workload对象,当这个对象 PodTemplate 发生变更后,就会执行更新策略。

// WorkloadRef holds a references to the Kubernetes object
type WorkloadRef struct {
    // API Version of the referent
    APIVersion string `json:"apiVersion"`
    // Kind of the referent
    Kind string `json:"kind"`
    // Name of the referent
    Name string `json:"name"`
}

CanaryStrategy 定义了我们的更新策略,其中CanaryStep则定义了我们每一步的动作:更新的副本数,更新完成后暂停多久,流量的权重。
对于流量相关的我这里就不做详解了,有兴趣的可以自己看。

// CanaryStrategy defines parameters for a Replica Based Canary
type CanaryStrategy struct {
    // Steps define the order of phases to execute release in batches(20%, 40%, 60%, 80%, 100%)
    // +optional
    Steps []CanaryStep `json:"steps,omitempty"`
    // TrafficRoutings hosts all the supported service meshes supported to enable more fine-grained traffic routing
    TrafficRoutings []*TrafficRouting `json:"trafficRoutings,omitempty"`
}

// CanaryStep defines a step of a canary workload.
type CanaryStep struct {
    // SetWeight sets what percentage of the canary pods should receive
    // +optional
    Weight *int32 `json:"weight,omitempty"`
    // Replicas is the number of expected canary pods in this batch
    // it can be an absolute number (ex: 5) or a percentage of total pods.
    Replicas *intstr.IntOrString `json:"replicas,omitempty"`
    // Pause defines a pause stage for a rollout, manual or auto
    // +optional
    Pause RolloutPause `json:"pause,omitempty"`
}

上面这部分定义了我们要观察的对象已经我们的动作。
下面这部分则定义了rollout的状态, rollout 在执行过程中,它的状态就会记录在这里面。

type RolloutStatus struct {
    // INSERT ADDITIONAL STATUS FIELD - define observed state of cluster
    // Important: Run "make" to regenerate code after modifying this file
    
    // observedGeneration is the most recent generation observed for this Rollout.
    ObservedGeneration int64 `json:"observedGeneration,omitempty"`
    // CanaryRevision the hash of the canary pod template
    // +optional
    //CanaryRevision string `json:"canaryRevision,omitempty"`
    // StableRevision indicates the revision pods that has successfully rolled out
    StableRevision string `json:"stableRevision,omitempty"`
    // Conditions a list of conditions a rollout can have.
    // +optional
    Conditions []RolloutCondition `json:"conditions,omitempty"`
    // Canary describes the state of the canary rollout
    // +optional
    CanaryStatus *CanaryStatus `json:"canaryStatus,omitempty"`
    // +optional
    //BlueGreenStatus *BlueGreenStatus `json:"blueGreenStatus,omitempty"`
    // Phase is the rollout phase.
    Phase RolloutPhase `json:"phase,omitempty"`
    // Message provides details on why the rollout is in its current phase
    Message string `json:"message,omitempty"`
}

// RolloutCondition describes the state of a rollout at a certain point.
type RolloutCondition struct {
    // Type of rollout condition.
    Type RolloutConditionType `json:"type"`
    // Phase of the condition, one of True, False, Unknown.
    Status corev1.ConditionStatus `json:"status"`
    // The last time this condition was updated.
    LastUpdateTime metav1.Time `json:"lastUpdateTime,omitempty"`
    // Last time the condition transitioned from one status to another.
    LastTransitionTime metav1.Time `json:"lastTransitionTime,omitempty"`
    // The reason for the condition's last transition.
    Reason string `json:"reason"`
    // A human readable message indicating details about the transition.
    Message string `json:"message"`
}
// CanaryStatus status fields that only pertain to the canary rollout
type CanaryStatus struct {
    // observedWorkloadGeneration is the most recent generation observed for this Rollout ref workload generation.
    ObservedWorkloadGeneration int64 `json:"observedWorkloadGeneration,omitempty"`
    // ObservedRolloutID will record the newest spec.RolloutID if status.canaryRevision equals to workload.updateRevision
    ObservedRolloutID string `json:"observedRolloutID,omitempty"`
    // RolloutHash from rollout.spec object
    RolloutHash string `json:"rolloutHash,omitempty"`
    // CanaryService holds the name of a service which selects pods with canary version and don't select any pods with stable version.
    CanaryService string `json:"canaryService"`
    // CanaryRevision is calculated by rollout based on podTemplateHash, and the internal logic flow uses
    // It may be different from rs podTemplateHash in different k8s versions, so it cannot be used as service selector label
    // +optional
    CanaryRevision string `json:"canaryRevision"`
    // pod template hash is used as service selector label
    PodTemplateHash string `json:"podTemplateHash"`
    // CanaryReplicas the numbers of canary revision pods
    CanaryReplicas int32 `json:"canaryReplicas"`
    // CanaryReadyReplicas the numbers of ready canary revision pods
    CanaryReadyReplicas int32 `json:"canaryReadyReplicas"`
    // CurrentStepIndex defines the current step of the rollout is on. If the current step index is null, the
    // controller will execute the rollout.
    // +optional
    CurrentStepIndex int32           `json:"currentStepIndex"`
    CurrentStepState CanaryStepState `json:"currentStepState"`
    Message          string          `json:"message,omitempty"`
    LastUpdateTime   *metav1.Time    `json:"lastUpdateTime,omitempty"`
}

其中整个rollout有这些阶段:

const (
    // RolloutPhaseInitial indicates a rollout is Initial
    RolloutPhaseInitial RolloutPhase = "Initial"
    // RolloutPhaseHealthy indicates a rollout is healthy
    RolloutPhaseHealthy RolloutPhase = "Healthy"
    // RolloutPhaseProgressing indicates a rollout is not yet healthy but still making progress towards a healthy state
    RolloutPhaseProgressing RolloutPhase = "Progressing"
    // RolloutPhaseTerminating indicates a rollout is terminated
    RolloutPhaseTerminating RolloutPhase = "Terminating"
)

每个CanaryStep则有这些状态:

const (
    CanaryStepStateUpgrade         CanaryStepState = "StepUpgrade"
    CanaryStepStateTrafficRouting  CanaryStepState = "StepTrafficRouting"
    CanaryStepStateMetricsAnalysis CanaryStepState = "StepMetricsAnalysis"
    CanaryStepStatePaused          CanaryStepState = "StepPaused"
    CanaryStepStateReady           CanaryStepState = "StepReady"
    CanaryStepStateCompleted       CanaryStepState = "Completed"
)

每个Conditions则有这些类型

const (
    RolloutConditionProgressing RolloutConditionType = "Progressing"
    // Progressing Reason
    ProgressingReasonInitializing = "Initializing"
    ProgressingReasonInRolling    = "InRolling"
    ProgressingReasonFinalising   = "Finalising"
    ProgressingReasonSucceeded    = "Succeeded"
    ProgressingReasonCancelling   = "Cancelling"
    ProgressingReasonCanceled     = "Canceled"
    ProgressingReasonPaused       = "Paused"
    
    // Terminating condition
    RolloutConditionTerminating RolloutConditionType = "Terminating"
    // Terminating Reason
    TerminatingReasonInTerminating = "InTerminating"
    TerminatingReasonCompleted     = "Completed"
)

BatchRelease

BatchRelease的Spec部分我就步过多介绍,它和 Rollout 差别不大。主要区别在与Status的记录。

BatchReleaseCanaryStatus记录了每个批次的发布状态,其中CurrentBatchState 3个状态:Upgrading,Verifying,Ready

type BatchReleaseCanaryStatus struct {
    // CurrentBatchState indicates the release state of the current batch.
    CurrentBatchState BatchReleaseBatchStateType `json:"batchState,omitempty"`
    // The current batch the rollout is working on/blocked, it starts from 0
    CurrentBatch int32 `json:"currentBatch"`
    // BatchReadyTime is the ready timestamp of the current batch or the last batch.
    // This field is updated once a batch ready, and the batches[x].pausedSeconds
    // relies on this field to calculate the real-time duration.
    BatchReadyTime *metav1.Time `json:"batchReadyTime,omitempty"`
    // UpdatedReplicas is the number of upgraded Pods.
    UpdatedReplicas int32 `json:"updatedReplicas,omitempty"`
    // UpdatedReadyReplicas is the number upgraded Pods that have a Ready Condition.
    UpdatedReadyReplicas int32 `json:"updatedReadyReplicas,omitempty"`
}

const (
    // UpgradingBatchState indicates that current batch is at upgrading pod state
    UpgradingBatchState BatchReleaseBatchStateType = "Upgrading"
    // VerifyingBatchState indicates that current batch is at verifying whether it's ready state
    VerifyingBatchState BatchReleaseBatchStateType = "Verifying"
    // ReadyBatchState indicates that current batch is at batch ready state
    ReadyBatchState BatchReleaseBatchStateType = "Ready"
)

上述两个CRD就是整个项目的核心,它们的各种状态转换,构成了完整的升级流程;其中 Rollout 负责整体流程的把控,由用户创建;BatchRelease 负责具体的各个workload的发布,由Rollout进行创建。

代码解读

main函数我就不详细介绍了,其主要是启动了两个控制器,来分别管理Rollout和BatchRelease两个CRD,同时启动一个webhook,来暂停workload,使得k8s控制器,暂时不做升级。

// main.go
func main() {
    ...
    // Rollout的管理器
    mgr, err := ctrl.NewManager(cfg, ctrl.Options{
        Scheme:                 scheme,
        MetricsBindAddress:     metricsAddr,
        Port:                   9443,
        HealthProbeBindAddress: probeAddr,
        LeaderElection:         enableLeaderElection,
        LeaderElectionID:       "71ddec2c.kruise.io",
        NewClient:              utilclient.NewClient,
    })
    ...
    // BatchRelease的管理器
    if err = br.Add(mgr); err != nil {
        setupLog.Error(err, "unable to create controller", "controller", "BatchRelease")
        os.Exit(1)
    }
    ...
    // workload 和 rollout 的 webhook
    if err = webhook.SetupWithManager(mgr); err != nil {
        setupLog.Error(err, "unable to setup webhook")
        os.Exit(1)
    }
    ...
}

Rollout 的调协过程

type RolloutReconciler struct {
    client.Client
    Scheme *runtime.Scheme

    Recorder record.EventRecorder //用于事件记录
    Finder   *util.ControllerFinder //用于寻找对应workload的控制器
}

我们可以详细来看看 Rollout Reconcile的主流程,
大概可以概括为以下几步:

  1. 监听rollout变更
  2. 获取一个动态的Watcher来监听对应的workload更新
  3. 处理finalizer:如果Rollout是新创建的添加finalizer,如果是Rollout需要删除就移除finalizer
  4. 更新rollout status,主要处理阶段为:RolloutPhaseInitial->RolloutPhaseHealthy->RolloutPhaseProgressing->RolloutPhaseHealthy->RolloutPhaseTerminating
  5. 处理RolloutPhaseProgressing阶段更新流程
  6. 处理RolloutPhaseTerminating阶段终止流程
func (r *RolloutReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
    // 1.监听rollout变更
    rollout := &rolloutv1alpha1.Rollout{}
    err := r.Get(context.TODO(), req.NamespacedName, rollout)
    if err != nil {
        if errors.IsNotFound(err) {
            return ctrl.Result{}, nil
        }
        return ctrl.Result{}, err
    }
    // 2. 获取一个动态的Watcher来监听对应的workload更新
    workloadRef := rollout.Spec.ObjectRef.WorkloadRef
    workloadGVK := util.GetGVKFrom(workloadRef)
    _, exists := watchedWorkload.Load(workloadGVK.String())
    if workloadRef != nil && !exists {
        succeeded, err := util.AddWatcherDynamically(runtimeController, workloadHandler, workloadGVK)
        if err != nil {
            return ctrl.Result{}, err
        } else if succeeded {
            watchedWorkload.LoadOrStore(workloadGVK.String(), struct{}{})
            klog.Infof("Rollout controller begin to watch workload type: %s", workloadGVK.String())
            return ctrl.Result{}, nil
        }
    }
    // 3.处理finalizer
    err = r.handleFinalizer(rollout)
    if err != nil {
        return ctrl.Result{}, err
    }
    // 4.更新rollout status的
    done, err := r.updateRolloutStatus(rollout)
    if err != nil {
        return ctrl.Result{}, err
    } else if !done {
        return ctrl.Result{}, nil
    }

    var recheckTime *time.Time
    switch rollout.Status.Phase {
    case rolloutv1alpha1.RolloutPhaseProgressing:
        // 5. 处理RolloutPhaseProgressing阶段更新流程
        recheckTime, err = r.reconcileRolloutProgressing(rollout)
    case rolloutv1alpha1.RolloutPhaseTerminating:
        // 6. 处理RolloutPhaseTerminating阶段终止流程
        recheckTime, err = r.reconcileRolloutTerminating(rollout)
    }
    if err != nil {
        return ctrl.Result{}, err
    } else if recheckTime != nil {
        return ctrl.Result{RequeueAfter: time.Until(*recheckTime)}, nil
    }
    return ctrl.Result{}, nil
}

1~3步比较简单,我就不多说,我们详细讲解一下4,5,6三步。

4. 更新rollout status

部分代码已经删减

func (r *RolloutReconciler) updateRolloutStatus(rollout *rolloutv1alpha1.Rollout) (done bool, err error) {
    ... 
    // 1.如果是删除CRD, rollout status Phase 就由其他阶段转换为(终止阶段)RolloutPhaseTerminating,同时添加 Condition:RolloutConditionTerminating=false.
    // 注意这里要特别注意Reason 因为Condition也状态转换条件之一
    // 2.如果rollout status Phase 没有,就默认是初始化阶段(RolloutPhaseInitial)
    if !rollout.DeletionTimestamp.IsZero() && newStatus.Phase != rolloutv1alpha1.RolloutPhaseTerminating {
        newStatus.Phase = rolloutv1alpha1.RolloutPhaseTerminating
        cond := util.NewRolloutCondition(rolloutv1alpha1.RolloutConditionTerminating, corev1.ConditionFalse, rolloutv1alpha1.TerminatingReasonInTerminating, "Rollout is in terminating")
        util.SetRolloutCondition(&newStatus, *cond)
    } else if newStatus.Phase == "" {
        newStatus.Phase = rolloutv1alpha1.RolloutPhaseInitial
    }
    // 3.获取rollout 对于的workload 的信息,没有获取到重置rollout status Phase 为RolloutPhaseInitial
    workload, err := r.Finder.GetWorkloadForRef(rollout.Namespace, rollout.Spec.ObjectRef.WorkloadRef)
    if err != nil {
        klog.Errorf("rollout(%s/%s) get workload failed: %s", rollout.Namespace, rollout.Name, err.Error())
        return
    } else if workload == nil {
        if rollout.DeletionTimestamp.IsZero() {
            resetStatus(&newStatus)
            klog.Infof("rollout(%s/%s) workload not found, and reset status be Initial", rollout.Namespace, rollout.Name)
        }
        done = true
        return
    }
    ...
    // 4 rollout status Phase 状态转换,每次调协,如果条件成立就转换一次,转换顺序为:
    // RolloutPhaseInitial --> RolloutPhaseHealthy
    // RolloutPhaseHealthy --> RolloutPhaseProgressing
    // RolloutPhaseProgressing --> RolloutPhaseHealthy
    // 其中RolloutPhaseHealthy --> RolloutPhaseProgressing 阶段 会将Condition的Type设置为RolloutConditionProgressing,同时修改其Reason为ProgressingReasonInitializing
    // RolloutPhaseProgressing --> RolloutPhaseHealthy 阶段 则需要Condition的Type为RolloutConditionProgressing的Reason为ProgressingReasonSucceeded或ProgressingReasonCanceled.
    // 或者没有Type为RolloutConditionProgressing的Condition
    switch newStatus.Phase {
    case rolloutv1alpha1.RolloutPhaseInitial:
        klog.Infof("rollout(%s/%s) status phase from(%s) -> to(%s)", rollout.Namespace, rollout.Name, rolloutv1alpha1.RolloutPhaseInitial, rolloutv1alpha1.RolloutPhaseHealthy)
        newStatus.Phase = rolloutv1alpha1.RolloutPhaseHealthy
        newStatus.Message = "rollout is healthy"
    case rolloutv1alpha1.RolloutPhaseHealthy:
        if workload.InRolloutProgressing {
            // from healthy to progressing
            klog.Infof("rollout(%s/%s) status phase from(%s) -> to(%s)", rollout.Namespace, rollout.Name, rolloutv1alpha1.RolloutPhaseHealthy, rolloutv1alpha1.RolloutPhaseProgressing)
            newStatus.Phase = rolloutv1alpha1.RolloutPhaseProgressing
            cond := util.NewRolloutCondition(rolloutv1alpha1.RolloutConditionProgressing, corev1.ConditionFalse, rolloutv1alpha1.ProgressingReasonInitializing, "Rollout is in Progressing")
            util.SetRolloutCondition(&newStatus, *cond)
        } else if workload.IsInStable && newStatus.CanaryStatus == nil {
            newStatus.CanaryStatus = &rolloutv1alpha1.CanaryStatus{
                CanaryReplicas:             workload.CanaryReplicas,
                CanaryReadyReplicas:        workload.CanaryReadyReplicas,
                ObservedRolloutID:          getRolloutID(workload, rollout),
                ObservedWorkloadGeneration: workload.Generation,
                PodTemplateHash:            workload.PodTemplateHash,
                CanaryRevision:             workload.CanaryRevision,
                CurrentStepIndex:           int32(len(rollout.Spec.Strategy.Canary.Steps)),
                CurrentStepState:           rolloutv1alpha1.CanaryStepStateCompleted,
            }
            newStatus.Message = "workload deployment is completed"
        }
    case rolloutv1alpha1.RolloutPhaseProgressing:
        cond := util.GetRolloutCondition(newStatus, rolloutv1alpha1.RolloutConditionProgressing)
        if cond == nil || cond.Reason == rolloutv1alpha1.ProgressingReasonSucceeded || cond.Reason == rolloutv1alpha1.ProgressingReasonCanceled {
            newStatus.Phase = rolloutv1alpha1.RolloutPhaseHealthy
        }
    }
    done = true
    return
}
5. 处理RolloutPhaseProgressing阶段更新流程

当rollout status Phase的状态处于RolloutPhaseProgressing后,后续的状态转换则主要是Type为RolloutConditionProgressing的Condition的Reason转换。

func (r *RolloutReconciler) reconcileRolloutProgressing(rollout *rolloutv1alpha1.Rollout) (*time.Time, error) {
    //1.获取Type 为 RolloutConditionProgressing 的 Condition 和 workload
    cond := util.GetRolloutCondition(rollout.Status, rolloutv1alpha1.RolloutConditionProgressing)
    workload, err := r.Finder.GetWorkloadForRef(rollout.Namespace, rollout.Spec.ObjectRef.WorkloadRef)
    if err != nil {
        return nil, err
    } else if workload == nil {
        return nil, nil
    } else if !workload.IsStatusConsistent {
        return nil, nil
    }
    
    var recheckTime *time.Time
    newStatus := rollout.Status.DeepCopy()
    // Condition Reason 状态转换,每次调协,如果条件成立就转换一次,转换顺序为:
    // ProgressingReasonInitializing --> ProgressingReasonInRolling
    // ProgressingReasonInRolling --> ProgressingReasonCancelling
    // ProgressingReasonInRolling --> ProgressingReasonPaused
    // ProgressingReasonInRolling --> ProgressingReasonInitializing
    // ProgressingReasonInRolling --> ProgressingReasonFinalising
    // ProgressingReasonFinalising --> ProgressingReasonSucceeded
    // ProgressingReasonPaused --> ProgressingReasonCancelling
    // ProgressingReasonPaused --> ProgressingReasonInRolling
    // ProgressingReasonCancelling --> ProgressingReasonCanceled
    switch cond.Reason {
    case rolloutv1alpha1.ProgressingReasonInitializing:
        newStatus.CanaryStatus = &rolloutv1alpha1.CanaryStatus{} // 初始化CanaryStatus
        done, _, err := r.doProgressingInitializing(rollout, newStatus)
        if done {
            progressingStateTransition(newStatus, corev1.ConditionFalse, rolloutv1alpha1.ProgressingReasonInRolling, "Rollout is in Progressing")
        }
    case rolloutv1alpha1.ProgressingReasonInRolling:
        recheckTime, err = r.doProgressingInRolling(rollout, workload, newStatus)
    case rolloutv1alpha1.ProgressingReasonFinalising:
        var done bool
        done, recheckTime, err = r.doFinalising(rollout, newStatus, true)
        if done {
            progressingStateTransition(newStatus, corev1.ConditionTrue, rolloutv1alpha1.ProgressingReasonSucceeded, "Rollout has been completed, and succeed")
        }
    case rolloutv1alpha1.ProgressingReasonPaused:
        if workload.IsInRollback {
            newStatus.CanaryStatus.CanaryRevision = workload.CanaryRevision
            r.Recorder.Eventf(rollout, corev1.EventTypeNormal, "Progressing", "workload has been rollback, then rollout is canceled")
            progressingStateTransition(newStatus, corev1.ConditionFalse, rolloutv1alpha1.ProgressingReasonCancelling, "The workload has been rolled back and the rollout process will be cancelled")
        } else if !rollout.Spec.Strategy.Paused {
            progressingStateTransition(newStatus, corev1.ConditionFalse, rolloutv1alpha1.ProgressingReasonInRolling, "")
        }

    case rolloutv1alpha1.ProgressingReasonCancelling:
        var done bool
        done, recheckTime, err = r.doFinalising(rollout, newStatus, false)
        if done {
            progressingStateTransition(newStatus, corev1.ConditionFalse, rolloutv1alpha1.ProgressingReasonCanceled, "")
        }
    case rolloutv1alpha1.ProgressingReasonSucceeded, rolloutv1alpha1.ProgressingReasonCanceled:
    }

    err = r.updateRolloutStatusInternal(rollout, *newStatus)
    if err != nil {
        return nil, err
    }
    return recheckTime, nil
}

其主要状态变更为:

  1. ProgressingReasonInitializing --> ProgressingReasonInRolling
  2. ProgressingReasonInRolling --> ProgressingReasonCancelling
  3. ProgressingReasonInRolling --> ProgressingReasonPaused
  4. ProgressingReasonInRolling --> ProgressingReasonInitializing
  5. ProgressingReasonInRolling --> ProgressingReasonFinalising
  6. ProgressingReasonFinalising --> ProgressingReasonSucceeded
  7. ProgressingReasonPaused --> ProgressingReasonCancelling
  8. ProgressingReasonPaused --> ProgressingReasonInRolling
  9. ProgressingReasonCancelling --> ProgressingReasonCanceled

其中Reason为ProgressingReasonInRolling则是核心,它负责处理工作负载在更新过程中碰到的各种问题。

  1. 如果workload需要回滚,则进入 ProgressingReasonCancelling 态。
  2. 如果workload需要暂停,则进入 ProgressingReasonPaused 态。
  3. 如果workload连续发布,就删除掉已经创建的batchRelease,然后重新进入 ProgressingReasonInitializing 态。
  4. 如果rollout发生改变,则重新计算当前的处于计划的第几步,然后更新Status.CanaryStatus。
  5. 如果Status.CanaryStatus.CurrentStepState的状态为CanaryStepStateCompleted,即已经更新完成,则进入 ProgressingReasonFinalising 态。
  6. 正常进行更新。

接下来,我们继续看如何进行正常的更新,其主要状态变更则转移为 Status.CanaryStatus 变更,接下来都以 CanaryStatus 进行表述。

其核心代码下面部分,为了方便阅读,我保留了注释和核心部分,日志则被我移除了:

// canary.go
func (r *rolloutContext) runCanary() error {
    canaryStatus := r.newStatus.CanaryStatus
    // init canary status
    if canaryStatus.CanaryRevision == "" {
        canaryStatus.CurrentStepState = rolloutv1alpha1.CanaryStepStateUpgrade
        canaryStatus.CanaryRevision = r.workload.CanaryRevision
        canaryStatus.CurrentStepIndex = 1
        canaryStatus.RolloutHash = r.rollout.Annotations[util.RolloutHashAnnotation]
    }

    // update canary status
    batch, err := r.batchControl.FetchBatchRelease()
    if err != nil {
        canaryStatus.CanaryReplicas = r.workload.CanaryReplicas
        canaryStatus.CanaryReadyReplicas = r.workload.CanaryReadyReplicas
    } else {
        canaryStatus.CanaryReplicas = batch.Status.CanaryStatus.UpdatedReplicas
        canaryStatus.CanaryReadyReplicas = batch.Status.CanaryStatus.UpdatedReadyReplicas
    }

    switch canaryStatus.CurrentStepState {
    case rolloutv1alpha1.CanaryStepStateUpgrade:
        done, err := r.doCanaryUpgrade()
        if err != nil {
            return err
        } else if done {
            canaryStatus.CurrentStepState = rolloutv1alpha1.CanaryStepStateTrafficRouting
            canaryStatus.LastUpdateTime = &metav1.Time{Time: time.Now()}
        }

    case rolloutv1alpha1.CanaryStepStateTrafficRouting:
        done, err := r.doCanaryTrafficRouting()
        if err != nil {
            return err
        } else if done {
            canaryStatus.LastUpdateTime = &metav1.Time{Time: time.Now()}
            canaryStatus.CurrentStepState = rolloutv1alpha1.CanaryStepStateMetricsAnalysis
        }
        expectedTime := time.Now().Add(time.Duration(defaultGracePeriodSeconds) * time.Second)
        r.recheckTime = &expectedTime

    case rolloutv1alpha1.CanaryStepStateMetricsAnalysis:
        done, err := r.doCanaryMetricsAnalysis()
        if err != nil {
            return err
        } else if done {
            canaryStatus.CurrentStepState = rolloutv1alpha1.CanaryStepStatePaused
        }

    case rolloutv1alpha1.CanaryStepStatePaused:
        done, err := r.doCanaryPaused()
        if err != nil {
            return err
        } else if done {
            canaryStatus.LastUpdateTime = &metav1.Time{Time: time.Now()}
            canaryStatus.CurrentStepState = rolloutv1alpha1.CanaryStepStateReady
        }

    case rolloutv1alpha1.CanaryStepStateReady:
        // run next step
        if len(r.rollout.Spec.Strategy.Canary.Steps) > int(canaryStatus.CurrentStepIndex) {
            canaryStatus.LastUpdateTime = &metav1.Time{Time: time.Now()}
            canaryStatus.CurrentStepIndex++
            canaryStatus.CurrentStepState = rolloutv1alpha1.CanaryStepStateUpgrade
        } else {
            canaryStatus.LastUpdateTime = &metav1.Time{Time: time.Now()}
            canaryStatus.CurrentStepState = rolloutv1alpha1.CanaryStepStateCompleted
        }
        // canary completed
    case rolloutv1alpha1.CanaryStepStateCompleted:
    }

    return nil
}

通过阅读上面代码,我们可以发现它的主要工作就是处理 CanaryStatus 的状态转换。

  1. 在 上一段代码里 当Reason处于ProgressingReasonInitializing时,CanaryStatus 会被初始化,方便在这里进行初始化设置。
  2. 初始化 CanaryStatus 的 CurrentStepState,CanaryRevision,RolloutHash
  3. 更新 CanaryStatus 的 CanaryReplicas 和 CanaryReadyReplicas信息
  4. CanaryStatus.CurrentStepState状态转换:
  5. 如果当前这步工作负载升级完成,则 CanaryStepStateUpgrade --> CanaryStepStateTrafficRouting
  6. CanaryStepStateTrafficRouting --> CanaryStepStateMetricsAnalysis --> CanaryStepStatePaused
  7. 如果当前时间在允许允许的时间之前,则CanaryStepStatePaused --> CanaryStepStateReady,如果 duration 为 nil,就需要手动进入CanaryStepStateReady
  8. 如果还有步骤没有执行,则 CanaryStepStateReady --> CanaryStepStateUpgrade ,索引+1
  9. 如果所有步骤已经执行,则 CanaryStepStateReady --> CanaryStepStateCompleted

这里面工作工作负载更新,核心阶段为 CanaryStepStateUpgrade
核心代码为:

func (r *rolloutContext) doCanaryUpgrade() (bool, error) {
    // verify whether batchRelease configuration is the latest
    steps := len(r.rollout.Spec.Strategy.Canary.Steps)
    canaryStatus := r.newStatus.CanaryStatus
    isLatest, err := r.batchControl.Verify(canaryStatus.CurrentStepIndex)
    if err != nil {
        return false, err
    } else if !isLatest {
        return false, nil
    }

    // fetch batchRelease
    batch, err := r.batchControl.FetchBatchRelease()
    if err != nil {
        return false, err
    } else if batch.Status.ObservedReleasePlanHash != util.HashReleasePlanBatches(&batch.Spec.ReleasePlan) ||
        batch.Generation != batch.Status.ObservedGeneration {
        return false, nil
    }
    batchData := util.DumpJSON(batch.Status)    
    cond := util.GetRolloutCondition(*r.newStatus, rolloutv1alpha1.RolloutConditionProgressing)
    cond.Message = fmt.Sprintf("Rollout is in step(%d/%d), and upgrade workload new versions", canaryStatus.CurrentStepIndex, steps)
    r.newStatus.Message = cond.Message
    // promote workload next batch release
    if !batchrelease.IsPromoted(r.rollout, batch, r.workload.IsInRollback) {
        r.recorder.Eventf(r.rollout, corev1.EventTypeNormal, "Progressing", fmt.Sprintf("start upgrade step(%d) canary pods with new versions", canaryStatus.CurrentStepIndex))
        klog.Infof("rollout(%s/%s) will promote batch from(%d) -> to(%d)", r.rollout.Namespace, r.rollout.Name, *batch.Spec.ReleasePlan.BatchPartition+1, canaryStatus.CurrentStepIndex)
        return r.batchControl.Promote(canaryStatus.CurrentStepIndex, r.workload.IsInRollback, false)
    }

    // check whether batchRelease is ready
    if batch.Status.CanaryStatus.CurrentBatchState != rolloutv1alpha1.ReadyBatchState ||
        batch.Status.CanaryStatus.CurrentBatch+1 < canaryStatus.CurrentStepIndex {
        return false, nil
    }
    r.recorder.Eventf(r.rollout, corev1.EventTypeNormal, "Progressing", fmt.Sprintf("upgrade step(%d) canary pods with new versions done", canaryStatus.CurrentStepIndex))
    return true, nil
}

未完待续

6. 处理RolloutPhaseTerminating阶段终止流程
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,525评论 6 507
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,203评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,862评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,728评论 1 294
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,743评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,590评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,330评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,244评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,693评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,885评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,001评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,723评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,343评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,919评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,042评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,191评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,955评论 2 355

推荐阅读更多精彩内容