高可用模式下的选主机制

选举原因

k8s为了保证kube-schedulerkube-manager-controller组件的高可用性,每个组件本身通过多实例副本方式部署。但是由于多个实例同时使用informer去监听处理必然带来一致性或重复执行问题,故采用分布式锁(选举)机制规避该问题

k8s是通过leaderelection来实现选举,只有一个实例能拿到锁,只有拿到锁的实例可以运行,其他实例会不断重试(轮询)方式获取锁

选举配置

  • LeaseDuration: 锁的时长,通过该值判断是否超时,超时则认定丢失锁
  • RenewDeadline: 续约时长,每次lease ttl的时长
  • RetryPeriod: 周期重试获取锁的间隔
  • Callbacks: 自定义方法
    • OnStartedLeading func(context.Context): 作为leader启动时进行回调
    • OnStoppedLeading func(): 关闭时进行回调
    • OnNewLeader func(identity string): 转换为leader时进行回调
package main

import (
    "context"
    "flag"
    "os"
    "time"

    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    clientset "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/rest"
    "k8s.io/client-go/tools/leaderelection"
    "k8s.io/client-go/tools/leaderelection/resourcelock"
    "k8s.io/klog"
)

var client *clientset.Clientset

func newLeaseLock(lockname, podname, namespace string) *resourcelock.LeaseLock {
    return &resourcelock.LeaseLock{
        LeaseMeta: metav1.ObjectMeta{
            Name:      lockname,
            Namespace: namespace,
        },
        Client: client.CoordinationV1(),
        LockConfig: resourcelock.ResourceLockConfig{
            Identity: podname,
        },
    }
}

func run(lock *resourcelock.LeaseLock, ctx context.Context, id string) {
    leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
        Lock:            lock,
        ReleaseOnCancel: true,
        LeaseDuration:   15 * time.Second,
        RenewDeadline:   10 * time.Second,
        RetryPeriod:     2 * time.Second,
        Callbacks: leaderelection.LeaderCallbacks{
            OnStartedLeading: func(c context.Context) {
                klog.Info("get identity", "leader: ", lock.LeaseMeta.Name)
                select {}
            },
            OnStoppedLeading: func() {
                klog.Info("no longer the leader, staying inactive.")
            },
            OnNewLeader: func(current_id string) {
                if current_id == id {
                    klog.Info("still the leader!")
                    return
                }
                klog.Info("new leader is %s", current_id)
            },
        },
    })
}

func main() {
    var (
        leaseLockName      string
        leaseLockNamespace string
        podName            = os.Getenv("POD_NAME")
    )
    flag.StringVar(&leaseLockName, "lease-name", "", "Name of lease lock")
    flag.StringVar(&leaseLockNamespace, "lease-namespace", "default", "Name of lease lock namespace")
    flag.Parse()

    if leaseLockName == "" {
        klog.Fatal("missing lease-name flag")
    }
    if leaseLockNamespace == "" {
        klog.Fatal("missing lease-namespace flag")
    }

    config, err := rest.InClusterConfig()
    client = clientset.NewForConfigOrDie(config)

    if err != nil {
        klog.Fatalf("failed to get kubeconfig")
    }

    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    lock := newLeaseLock(leaseLockName, podName, leaseLockNamespace)
    run(lock, ctx, podName)
}

选举逻辑

启动选举

资源锁列表

  • leases(recommend)
  • endpointsleases
  • configmapsleases
  • configmaps
  • endpoints

创建资源锁

  • 使用resourcelock.New创建
// Manufacture will create a lock of a given type according to the input parameters
func New(lockType string, ns string, name string, coreClient corev1.CoreV1Interface, coordinationClient coordinationv1.CoordinationV1Interface, rlc ResourceLockConfig) (Interface, error) {
    leaseLock := &LeaseLock{
        LeaseMeta: metav1.ObjectMeta{
            Namespace: ns,
            Name:      name,
        },
        Client:     coordinationClient,
        LockConfig: rlc,
    }
    switch lockType {
    case endpointsResourceLock:
        return nil, fmt.Errorf("endpoints lock is removed, migrate to %s", LeasesResourceLock)
    case configMapsResourceLock:
        return nil, fmt.Errorf("configmaps lock is removed, migrate to %s", LeasesResourceLock)
    case LeasesResourceLock:
        return leaseLock, nil
    case endpointsLeasesResourceLock:
        return nil, fmt.Errorf("endpointsleases lock is removed, migrate to %s", LeasesResourceLock)
    case configMapsLeasesResourceLock:
        return nil, fmt.Errorf("configmapsleases lock is removed, migrated to %s", LeasesResourceLock)
    default:
        return nil, fmt.Errorf("Invalid lock-type %s", lockType)
    }
}
  • 使用resourcelock.NewFromKubeconfig创建
// NewFromKubeconfig will create a lock of a given type according to the input parameters.
// Timeout set for a client used to contact to Kubernetes should be lower than
// RenewDeadline to keep a single hung request from forcing a leader loss.
// Setting it to max(time.Second, RenewDeadline/2) as a reasonable heuristic.
func NewFromKubeconfig(lockType string, ns string, name string, rlc ResourceLockConfig, kubeconfig *restclient.Config, renewDeadline time.Duration) (Interface, error) {
    // shallow copy, do not modify the kubeconfig
    config := *kubeconfig
    timeout := renewDeadline / 2
    if timeout < time.Second {
        timeout = time.Second
    }
    config.Timeout = timeout
    leaderElectionClient := clientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "leader-election"))
    return New(lockType, ns, name, leaderElectionClient.CoreV1(), leaderElectionClient.CoordinationV1(), rlc)
}

选举

准备好资源锁后,调用方法leaderelection.RunOrDie开始选举。

leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
    Lock:            lock,
    ReleaseOnCancel: true,
    LeaseDuration:   15 * time.Second,
    RenewDeadline:   10 * time.Second,
    RetryPeriod:     2 * time.Second,
    Callbacks: leaderelection.LeaderCallbacks{
        OnStartedLeading: func(c context.Context) {
            klog.Info("get identity", "leader: ", lock.LeaseMeta.Name)
            select {}
        },
        OnStoppedLeading: func() {
            klog.Info("no longer the leader, staying inactive.")
        },
        OnNewLeader: func(current_id string) {
            if current_id == id {
                klog.Info("still the leader!")
                return
            }
            klog.Info("new leader is %s", current_id)
        },
    },
})

选举主流程

启动选举后,RunOrDie方法会调用le.Run(ctx)方法开始真正的选举流程,该方法除非在以下情况下才会返回:

  • ctx被取消(外部要求中止选举流程)
  • 当选了 Leader 后,任期结束(网络或某种原因导致续期失败)

当未曾竞选Leader时,则会卡在le.acquire方法中持续竞选

// Run starts the leader election loop. Run will not return
// before leader election loop is stopped by ctx or it has
// stopped holding the leader lease
func (le *LeaderElector) Run(ctx context.Context) {
    defer runtime.HandleCrash()
    defer le.config.Callbacks.OnStoppedLeading()

    if !le.acquire(ctx) {
        return // ctx signalled done
    }
    ctx, cancel := context.WithCancel(ctx)
    defer cancel()
    go le.config.Callbacks.OnStartedLeading(ctx)
    le.renew(ctx)
}

// RunOrDie starts a client with the provided config or panics if the config
// fails to validate. RunOrDie blocks until leader election loop is
// stopped by ctx or it has stopped holding the leader lease
func RunOrDie(ctx context.Context, lec LeaderElectionConfig) {
    le, err := NewLeaderElector(lec)
    if err != nil {
        panic(err)
    }
    if lec.WatchDog != nil {
        lec.WatchDog.SetLeaderElection(le)
    }
    le.Run(ctx)
}

竞选

竞选逻辑

// acquire loops calling tryAcquireOrRenew and returns true immediately when tryAcquireOrRenew succeeds.
// Returns false if ctx signals done.
func (le *LeaderElector) acquire(ctx context.Context) bool {
    ctx, cancel := context.WithCancel(ctx)
    defer cancel()
    succeeded := false
    desc := le.config.Lock.Describe()
    klog.Infof("attempting to acquire leader lease %v...", desc)
    wait.JitterUntil(func() {
        if !le.config.Coordinated {
            succeeded = le.tryAcquireOrRenew(ctx)
        } else {
            succeeded = le.tryCoordinatedRenew(ctx)
        }
        le.maybeReportTransition()
        if !succeeded {
            klog.V(4).Infof("failed to acquire lease %v", desc)
            return
        }
        le.config.Lock.RecordEvent("became leader")
        le.metrics.leaderOn(le.config.Name)
        klog.Infof("successfully acquired lease %v", desc)
        cancel()
    }, le.config.RetryPeriod, JitterFactor, true, ctx.Done())
    return succeeded
}

返回值情况:

  • succeeded 默认值是false,在竞选循环中,外部通过ctx中止竞选时会中止wait.JitterUntil循环并返回false
  • 如果竞选成功,则会改写succeeded=true并手动调用cancel()中止wait.JitterUntil循环

acquire函数只会有两种情况会返回:

  • true:当选 leader
  • false:外部中止竞选

未当选的竞选者会在wait.JitterUntil循环中持续尝试

抢锁和续约

抢锁操作和续约操作逻辑都在函数tryAcquireOrRenew

// tryAcquireOrRenew tries to acquire a leader lease if it is not already acquired,
// else it tries to renew the lease if it has already been acquired. Returns true
// on success else returns false.
func (le *LeaderElector) tryAcquireOrRenew(ctx context.Context) bool {
    now := metav1.NewTime(le.clock.Now())
    leaderElectionRecord := rl.LeaderElectionRecord{
        HolderIdentity:       le.config.Lock.Identity(),
        LeaseDurationSeconds: int(le.config.LeaseDuration / time.Second),
        RenewTime:            now,
        AcquireTime:          now,
    }

    // 1. fast path for the leader to update optimistically assuming that the record observed
    // last time is the current version.
    if le.IsLeader() && le.isLeaseValid(now.Time) {
        oldObservedRecord := le.getObservedRecord()
        leaderElectionRecord.AcquireTime = oldObservedRecord.AcquireTime
        leaderElectionRecord.LeaderTransitions = oldObservedRecord.LeaderTransitions

        err := le.config.Lock.Update(ctx, leaderElectionRecord)
        if err == nil {
            le.setObservedRecord(&leaderElectionRecord)
            return true
        }
        klog.Errorf("Failed to update lock optimistically: %v, falling back to slow path", err)
    }

    // 2. obtain or create the ElectionRecord
    oldLeaderElectionRecord, oldLeaderElectionRawRecord, err := le.config.Lock.Get(ctx)
    if err != nil {
        if !errors.IsNotFound(err) {
            klog.Errorf("error retrieving resource lock %v: %v", le.config.Lock.Describe(), err)
            return false
        }
        if err = le.config.Lock.Create(ctx, leaderElectionRecord); err != nil {
            klog.Errorf("error initially creating leader election record: %v", err)
            return false
        }

        le.setObservedRecord(&leaderElectionRecord)

        return true
    }

    // 3. Record obtained, check the Identity & Time
    if !bytes.Equal(le.observedRawRecord, oldLeaderElectionRawRecord) {
        le.setObservedRecord(oldLeaderElectionRecord)

        le.observedRawRecord = oldLeaderElectionRawRecord
    }
    if len(oldLeaderElectionRecord.HolderIdentity) > 0 && le.isLeaseValid(now.Time) && !le.IsLeader() {
        klog.V(4).Infof("lock is held by %v and has not yet expired", oldLeaderElectionRecord.HolderIdentity)
        return false
    }

    // 4. We're going to try to update. The leaderElectionRecord is set to it's default
    // here. Let's correct it before updating.
    if le.IsLeader() {
        leaderElectionRecord.AcquireTime = oldLeaderElectionRecord.AcquireTime
        leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions
        le.metrics.slowpathExercised(le.config.Name)
    } else {
        leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + 1
    }

    // update the lock itself
    if err = le.config.Lock.Update(ctx, leaderElectionRecord); err != nil {
        klog.Errorf("Failed to update lock: %v", err)
        return false
    }

    le.setObservedRecord(&leaderElectionRecord)
    return true
}

原理分析

任期原理

基于le.observedTime判断当前主结点是否有效,le.observedTime是在主结点发生变化时调用le.setObservedRecord方法更新。更新时机:

  • 锁不存在,创建锁成功时更新
  • 获取到锁记录和缓存的不同,说明上次尝试获取锁到现在的间隔内主结点变更,更新缓存
  • 主结点超期没续期且当前节点抢锁成功,更新缓存

所以le.observedTime变化的时候都是监测到主结点变化的时间,所以le.observedTime + LeaseDuration的时间就是主结点的结束时间

抢锁原理

基于 kubernetes 的资源乐观锁实现的(通常是主结点自己去更新锁信息)

  • 获取锁方法le.config.Lock.Get(ctx)会取得当前锁的最新resourceVersion并保存
  • 更新锁时提供保存的resourceVersion
  • Kubernetes对比resourceVersion和最新值,如果相等则允许更新,返回成功,否则更新失败
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容