KubeEdge分析-reliablesync

KubeEdge分析-reliablesyncs

前言

reliablesyncs是KubeEdge v1.2加入的,主要是用来实现cloudcore的高可用,防止cloudcore故障切换的时候,丢失数据。

design见 https://github.com/kubeedge/kubeedge/blob/master/docs/proposals/reliable-message-delivery.md

从设计文档看,应该是要保证消息的“At-Least-Once”,不过从代码实现上看,目前应该是只完成了一部分,还没实现完整,比如ack机制还没做。


reliablemessage-workflow.png

CRD

reliablesyncs引入了2个新的crd:

  • ClusterObjectSync
  • ObjectSync
    从代码中看,目前只实现了ObjectSync,ClusterObjectSync是没有实现的,所以下面主要看ObjectSync

这两个crd的内容比较简单,只有objectType、objectName以及objectResourceVersion3个字段

入口

从代码中看,有几个地方都有一部分reliablesyncs相关的代码,分别在cloudhub、synccontroller下面

cloudhub

cloudhub的start方法初始化了一个新的newObjectSyncController,然后调用WaitForCacheSync方法等到cache sync完成,

Start

newObjectSyncController

func newObjectSyncController() *hubconfig.ObjectSyncController {
    config, err := buildConfig()
    if err != nil {
        klog.Errorf("Failed to build config, err: %v", err)
        os.Exit(1)
    }

    crdClient := versioned.NewForConfigOrDie(config)
    crdFactory := crdinformerfactory.NewSharedInformerFactory(crdClient, 0)

    clusterObjectSyncInformer := crdFactory.Reliablesyncs().V1alpha1().ClusterObjectSyncs()
    objectSyncInformer := crdFactory.Reliablesyncs().V1alpha1().ObjectSyncs()

    sc := &hubconfig.ObjectSyncController{
        CrdClient: crdClient,

        ClusterObjectSyncInformer: clusterObjectSyncInformer,
        ObjectSyncInformer:        objectSyncInformer,

        ClusterObjectSyncSynced: clusterObjectSyncInformer.Informer().HasSynced,
        ObjectSyncSynced:        objectSyncInformer.Informer().HasSynced,

        ClusterObjectSyncLister: clusterObjectSyncInformer.Lister(),
        ObjectSyncLister:        objectSyncInformer.Lister(),
    }

    go sc.ClusterObjectSyncInformer.Informer().Run(beehiveContext.Done())
    go sc.ObjectSyncInformer.Informer().Run(beehiveContext.Done())

    return sc
}

首先初始化了一个crdclient和crdFactory
crdclient是实现了go-client的rest.Interface的一组client的集合(封装到了Clientset这个结构体中)
Clientset包含了devicesV1alpha1、reliablesyncsV1alpha1和DiscoveryClient这三个client。每个client在rest client基础上新加了一些接口;
crdFactory是一个配置了resync时间为0的sharedInformerFactory(为0应该就是不自动做resync,这里也可以理解,毕竟整个object就是用来做sync用的,再让go-client自动做resync就显得有点多余了)

下面这些代码都是client-go生成的,简单看下

${SCRIPT_ROOT}/cloud/hack/generate-groups.sh "deepcopy,client,informer,lister" \
github.com/kubeedge/kubeedge/cloud/pkg/client github.com/kubeedge/kubeedge/cloud/pkg/apis \
"devices:v1alpha1 reliablesyncs:v1alpha1" \
--go-header-file ${SCRIPT_ROOT}/cloud/hack/boilerplate/boilerplate.txt
type sharedInformerFactory struct {
    client           versioned.Interface
    namespace        string
    tweakListOptions internalinterfaces.TweakListOptionsFunc
    lock             sync.Mutex
    defaultResync    time.Duration
    customResync     map[reflect.Type]time.Duration

    informers map[reflect.Type]cache.SharedIndexInformer
    // startedInformers is used for tracking which informers have been started.
    // This allows Start() to be called multiple times safely.
    startedInformers map[reflect.Type]bool
}

这里再看下sharedInformerFactory,这里基本就是go-client定义的标准结构,sharedInformerFactory又实现了Devices()和Reliablesyncs()这两个方法。

Devices()、Reliablesyncs()分别调用了New方法

type version struct {
    factory          internalinterfaces.SharedInformerFactory
    namespace        string
    tweakListOptions internalinterfaces.TweakListOptionsFunc
}

// New returns a new Interface.
func New(f internalinterfaces.SharedInformerFactory, namespace string, tweakListOptions internalinterfaces.TweakListOptionsFunc) Interface {
    return &version{factory: f, namespace: namespace, tweakListOptions: tweakListOptions}
}


// NewInformerFunc takes versioned.Interface and time.Duration to return a SharedIndexInformer.
type NewInformerFunc func(versioned.Interface, time.Duration) cache.SharedIndexInformer

// SharedInformerFactory a small interface to allow for adding an informer without an import cycle
type SharedInformerFactory interface {
    Start(stopCh <-chan struct{})
    InformerFor(obj runtime.Object, newFunc NewInformerFunc) cache.SharedIndexInformer
}

// TweakListOptionsFunc is a function that transforms a v1.ListOptions.
type TweakListOptionsFunc func(*v1.ListOptions)

这里流程还是比较长的,这里先记录下,暂时先不仔细看了。

接着初始化了clusterObjectSyncInformer和objectSyncInformer这两个Informer

// Interface provides access to all the informers in this group version.
type Interface interface {
    // ClusterObjectSyncs returns a ClusterObjectSyncInformer.
    ClusterObjectSyncs() ClusterObjectSyncInformer
    // ObjectSyncs returns a ObjectSyncInformer.
    ObjectSyncs() ObjectSyncInformer
}

type version struct {
    factory          internalinterfaces.SharedInformerFactory
    namespace        string
    tweakListOptions internalinterfaces.TweakListOptionsFunc
}

// New returns a new Interface.
func New(f internalinterfaces.SharedInformerFactory, namespace string, tweakListOptions internalinterfaces.TweakListOptionsFunc) Interface {
    return &version{factory: f, namespace: namespace, tweakListOptions: tweakListOptions}
}

// ClusterObjectSyncs returns a ClusterObjectSyncInformer.
func (v *version) ClusterObjectSyncs() ClusterObjectSyncInformer {
    return &clusterObjectSyncInformer{factory: v.factory, tweakListOptions: v.tweakListOptions}
}

// ObjectSyncs returns a ObjectSyncInformer.
func (v *version) ObjectSyncs() ObjectSyncInformer {
    return &objectSyncInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions}
}

上面这些方法应该都是client-go生成的。

接着把crdClient、ClusterObjectSyncInformer、objectSyncInformer等都保存到ObjectSyncController这个对象中,以便后续使用。
然后调用了Informer().Run方法,这里详细看下

Informer().Run

Informer()返回的是cache.SharedIndexInformer对象,这个对象是定义在client-go中的,run方法就是启动这个informer。
run方法的结束条件是beehiveContext.Done()

WaitForCacheSync

回到Start方法中,调用cache.WaitForCacheSync方法

// WaitForCacheSync waits for caches to populate.  It returns true if it was successful, false
// if the controller should shutdown
// callers should prefer WaitForNamedCacheSync()
func WaitForCacheSync(stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool {
    err := wait.PollImmediateUntil(syncedPollPeriod,
        func() (bool, error) {
            for _, syncFunc := range cacheSyncs {
                if !syncFunc() {
                    return false, nil
                }
            }
            return true, nil
        },
        stopCh)
    if err != nil {
        klog.V(2).Infof("stop requested")
        return false
    }

    klog.V(4).Infof("caches populated")
    return true
}

这里实际上就是分别调用参数列表中传入的方法ClusterObjectSyncSynced和ObjectSyncSynced,直到这些方法都返回true。
而ClusterObjectSyncSynced在之前的newObjectSyncController中被初始化为clusterObjectSyncInformer.Informer().HasSynced

func (f *clusterObjectSyncInformer) Informer() cache.SharedIndexInformer {
    return f.factory.InformerFor(&reliablesyncsv1alpha1.ClusterObjectSync{}, f.defaultInformer)
}

所以这里最终调用到的是internalinterfaces.SharedInformerFactory的InformerFor方法。

// InternalInformerFor returns the SharedIndexInformer for obj using an internal
// client.
func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
    f.lock.Lock()
    defer f.lock.Unlock()

    informerType := reflect.TypeOf(obj)
    informer, exists := f.informers[informerType]
    if exists {
        return informer
    }

    resyncPeriod, exists := f.customResync[informerType]
    if !exists {
        resyncPeriod = f.defaultResync
    }

    informer = newFunc(f.client, resyncPeriod)
    f.informers[informerType] = informer

    return informer
}

这里是理解整个Informer初始化的关键了,InformerFor方法传入了2个参数,一个是要被watch的对象,另一个是创建informer的方法。
比如这里要被watch的是ClusterObjectSync对象,NewInformerFunc传入的是defaultInformer

// NewFilteredClusterObjectSyncInformer constructs a new informer for ClusterObjectSync type.
// Always prefer using an informer factory to get a shared informer instead of getting an independent
// one. This reduces memory footprint and number of connections to the server.
func NewFilteredClusterObjectSyncInformer(client versioned.Interface, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
    return cache.NewSharedIndexInformer(
        &cache.ListWatch{
            ListFunc: func(options v1.ListOptions) (runtime.Object, error) {
                if tweakListOptions != nil {
                    tweakListOptions(&options)
                }
                return client.ReliablesyncsV1alpha1().ClusterObjectSyncs().List(options)
            },
            WatchFunc: func(options v1.ListOptions) (watch.Interface, error) {
                if tweakListOptions != nil {
                    tweakListOptions(&options)
                }
                return client.ReliablesyncsV1alpha1().ClusterObjectSyncs().Watch(options)
            },
        },
        &reliablesyncsv1alpha1.ClusterObjectSync{},
        resyncPeriod,
        indexers,
    )
}

这里可以看到创建NewSharedIndexInformer主要就是创建ListFunc和WatchFunc两个方法,而这两个方法又调用了client.DevicesV1alpha1().Devices(namespace).List(options)方法。

再回顾一下,WaitForCacheSync-->ClusterObjectSyncSynced-->clusterObjectSyncInformer.Informer().HasSynced-->f.factory.InformerFor(&reliablesyncsv1alpha1.ClusterObjectSync{}, f.defaultInformer)-->defaultInformer-->NewFilteredClusterObjectSyncInformer

所以对ClusterObjectSyncSynced来说,实际是对ClusterObjectSyncs进行watch。
对ObjectSyncSynced来说,则对ObjectSync进行watch。
WaitForCacheSync-->ObjectSyncSynced-->objectSyncInformer.Informer().HasSynced-->f.factory.InformerFor(&reliablesyncsv1alpha1.ObjectSync{}, f.defaultInformer)-->defaultInformer-->NewFilteredObjectSyncInformer

回到Start中

所以在cloudhub的start中,其实就是对ClusterObjectSync和ObjectSync这两类对象进行同步。
接着又初始化了一个ChannelMessageQueue对象,这个对象中保存了ObjectSyncController对象(这个操作有点奇怪,后面再分析他的用途)

然后调用ChannelMessageQueue的DispatchMessage方法(这个方法本身是用于将cloud的消息分发给edge的,这里多了一个ObjectSyncController对象以后就要看下他在消息分发过程中的作用)

DispatchMessage

DispatchMessage-->addMessageToQueue-->BuildObjectSyncName

    if !isDeleteMessage(msg) {
        // If the message doesn't exist in the store, then compare it with
        // the version stored in the database
        if !exist {
            resourceNamespace, _ := edgemessagelayer.GetNamespace(*msg)
            resourceUID, err := GetMessageUID(*msg)
            if err != nil {
                klog.Errorf("fail to get message UID for message: %s", msg.Header.ID)
                return
            }

            objectSync, err := q.ObjectSyncController.ObjectSyncLister.ObjectSyncs(resourceNamespace).Get(synccontroller.BuildObjectSyncName(nodeID, resourceUID))
            if err == nil && msg.GetResourceVersion() <= objectSync.ResourceVersion {
                return
            }
        }

这里终于看到了ObjectSyncController的出现,这里实际就是对比了一下objectSync对象的版本和msg中的版本,如果objectsync中的版本更高,这里就不处理了(说明已经同步了更高的版本)

saveSuccessPoint

在启动websocket server的时候,注册了新连接的回调方法OnRegister,

// OnRegister register node on first connection
func (mh *MessageHandle) OnRegister(connection conn.Connection) {
    nodeID := connection.ConnectionState().Headers.Get("node_id")
    projectID := connection.ConnectionState().Headers.Get("project_id")

    if _, ok := mh.KeepaliveChannel[nodeID]; !ok {
        mh.KeepaliveChannel[nodeID] = make(chan struct{}, 1)
    }

    io := &hubio.JSONIO{Connection: connection}
    go mh.ServeConn(io, &model.HubInfo{ProjectID: projectID, NodeID: nodeID})
}

OnRegister从连接信息中取出node_id和project_id相关信息,然后调用ServeConn方法。

ServeConn-->handler->(InitHandler)-->ListMessageWriteLoop-->handleMessage-->sendMsg-->saveSuccessPoint
                                  | 
                                  |->MessageWriteLoop-->

ServeConn最终调用到了saveSuccessPoint方法,

// MessageWriteLoop processes all write requests
func (mh *MessageHandle) MessageWriteLoop(hi hubio.CloudHubIO, info *model.HubInfo, stopServe chan ExitCode, stopSendMsg chan struct{}) {
    nodeQueue, err := mh.MessageQueue.GetNodeQueue(info.NodeID)
    if err != nil {
        klog.Errorf("Failed to get nodeQueue for node %s: %v", info.NodeID, err)
        stopServe <- messageQueueDisconnect
        return
    }
    nodeStore, err := mh.MessageQueue.GetNodeStore(info.NodeID)
    if err != nil {
        klog.Errorf("Failed to get nodeStore for node %s: %v", info.NodeID, err)
        stopServe <- messageQueueDisconnect
        return
    }

    for {
        select {
        case <-stopSendMsg:
            klog.Errorf("Node %s disconnected and stopped sending messages", info.NodeID)
            return
        default:
            mh.handleMessage(nodeQueue, nodeStore, hi, info, stopServe, "message")
        }
    }
}

HubInfo只包含了projectID和nodeID两个信息,MessageWriteLoop根据nodeid,从MessageQueue中取出缓存的消息,然后交给mh.handleMessage处理。
要注意这里的消息应该是只有云发送给边的消息(只有DispatchMessage方法网MessageQueue中写数据)。

func (mh *MessageHandle) saveSuccessPoint(msg *beehiveModel.Message, info *model.HubInfo, nodeStore cache.Store) {
    if msg.GetGroup() == edgeconst.GroupResource {
        resourceNamespace, _ := edgemessagelayer.GetNamespace(*msg)
        resourceName, _ := edgemessagelayer.GetResourceName(*msg)
        resourceType, _ := edgemessagelayer.GetResourceType(*msg)
        resourceUID, err := channelq.GetMessageUID(*msg)
        if err != nil {
            return
        }

        objectSyncName := synccontroller.BuildObjectSyncName(info.NodeID, resourceUID)

        if msg.GetOperation() == beehiveModel.DeleteOperation {
            nodeStore.Delete(msg)
            mh.deleteSuccessPoint(resourceNamespace, objectSyncName)
            return
        }

        objectSync, err := mh.MessageQueue.ObjectSyncController.CrdClient.ReliablesyncsV1alpha1().ObjectSyncs(resourceNamespace).Get(objectSyncName, metav1.GetOptions{})
        if err == nil {
            objectSync.Status.ObjectResourceVersion = msg.GetResourceVersion()
            _, err := mh.MessageQueue.ObjectSyncController.CrdClient.ReliablesyncsV1alpha1().ObjectSyncs(resourceNamespace).UpdateStatus(objectSync)
            if err != nil {
                klog.Errorf("Failed to update objectSync: %v, resourceType: %s, resourceNamespace: %s, resourceName: %s",
                    err, resourceType, resourceNamespace, resourceName)
            }
        } else if err != nil && apierrors.IsNotFound(err) {
            objectSync := &v1alpha1.ObjectSync{
                ObjectMeta: metav1.ObjectMeta{
                    Name: objectSyncName,
                },
                Spec: v1alpha1.ObjectSyncSpec{
                    ObjectAPIVersion: "",
                    ObjectKind:       resourceType,
                    ObjectName:       resourceName,
                },
            }
            _, err := mh.MessageQueue.ObjectSyncController.CrdClient.ReliablesyncsV1alpha1().ObjectSyncs(resourceNamespace).Create(objectSync)
            if err != nil {
                klog.Errorf("Failed to create objectSync: %s, err: %v", objectSyncName, err)
                return
            }

            objectSyncStatus, err := mh.MessageQueue.ObjectSyncController.CrdClient.ReliablesyncsV1alpha1().ObjectSyncs(resourceNamespace).Get(objectSyncName, metav1.GetOptions{})
            if err != nil {
                klog.Errorf("Failed to get objectSync: %s, err: %v", objectSyncName, err)
            }
            objectSyncStatus.Status.ObjectResourceVersion = msg.GetResourceVersion()
            mh.MessageQueue.ObjectSyncController.CrdClient.ReliablesyncsV1alpha1().ObjectSyncs(resourceNamespace).UpdateStatus(objectSyncStatus)
        }
    }

    // TODO: save device info
    if msg.GetGroup() == deviceconst.GroupTwin {
    }
    klog.Infof("saveSuccessPoint successfully for message: %s", msg.GetResource())
}

从之前的分析看,saveSuccessPoint也是只用在云端往边缘发消息这个场景下。

首先要判断msg的Group,对于K8S本身的资源,Group是"resource",也就是这里要处理的。
对于device相关的msg,group是"twin",目前还没处理。

然后根据NodeID和resourceUID创建一个objectSyncName,这个objectSyncName是各类和objectSync相关的存储的主键。

对于删除操作,从nodeStore中删除msg,并删除以objectSyncName为key的SuccessPoint

然后以objectSyncName为key向k8s查询Object对象

  • 如果可以查到,则更新ObjectSync对象
  • 如果查询不到,则创建一个新的ObjectSync对象

新的ObjectSync对象包含了objectSyncName、resourceType、resourceName。创建成功后,再根据名字查询出objectSync对象,然后将objectSyncStatus.Status.ObjectResourceVersion更新为msg中的resourceVersion(这里为啥要分两步,而不是在创建的时候就把ResourceVersion放进去?)

小结

从整个流程看,ObjectSync就是将云端需要发送给边缘端的K8S原生资源的操作,保存到ObjectSync对象中(ObjectSync对象本身也是存到K8S中的)

在DispatchMessage的时候,首先判断objectSync对象的版本和msg中的版本,如果objectsync中的版本更高,这个操作就会被丢弃,因为已经同步了更高的版本。

这里需要注意的是,上述方式只能保证云发送到边不会出现重复发送或者乱序发送,但并不能保证这个操作被发送到边缘,并被正确执行。(这里不确定是否会有bug,比如objectsync中保存了操作,但是edge节点在这个时候down了,那么从现在的逻辑看,如果没有后续对这个资源的操作,那这个资源在edge节点恢复后,也不会被同步)

synccontroller

Register

Register方法中,创建了一个synccontroller

func Register(ec *configv1alpha1.SyncController, kubeAPIConfig *configv1alpha1.KubeAPIConfig) {
    config.InitConfigure(ec, kubeAPIConfig)
    core.Register(newSyncController(ec.Enable))
}

func newSyncController(enable bool) *SyncController {
    config, err := buildConfig()
    if err != nil {
        klog.Errorf("Failed to build config, err: %v", err)
        os.Exit(1)
    }
    kubeClient := kubernetes.NewForConfigOrDie(config)
    crdClient := versioned.NewForConfigOrDie(config)

    kubeSharedInformers := informers.NewSharedInformerFactory(kubeClient, 0)
    crdFactory := crdinformerfactory.NewSharedInformerFactory(crdClient, 0)

    podInformer := kubeSharedInformers.Core().V1().Pods()
    configMapInformer := kubeSharedInformers.Core().V1().ConfigMaps()
    ...

    sctl := &SyncController{
        enable: enable,

        podInformer:               podInformer,
        configMapInformer:         configMapInformer,
        ...

        podSynced:               podInformer.Informer().HasSynced,
        configMapSynced:         configMapInformer.Informer().HasSynced,
        ...

        podLister:               podInformer.Lister(),
        configMapLister:         configMapInformer.Lister(),
        ...
    }

    return sctl
}

newSyncController初始化了一堆informer,并将它们保存了起来。

Start

start方法分别让各种informer都run起来,并等他们完成同步。接着启动一个go协程

go wait.Until(sctl.reconcile, 5*time.Second, beehiveContext.Done())

这里就是每5秒运行一次reconcile()方法直到整个程序退出。

reconcile

func (sctl *SyncController) reconcile() {
    allClusterObjectSyncs, err := sctl.clusterObjectSyncLister.List(labels.Everything())
    if err != nil {
        klog.Errorf("Filed to list all the ClusterObjectSyncs: %v", err)
    }
    sctl.manageClusterObjectSync(allClusterObjectSyncs)

    allObjectSyncs, err := sctl.objectSyncLister.List(labels.Everything())
    if err != nil {
        klog.Errorf("Filed to list all the ObjectSyncs: %v", err)
    }
    sctl.manageObjectSync(allObjectSyncs)

    sctl.manageCreateFailedObject()
}

从代码中看,manageClusterObjectSync是空的方法,应该是为了以后多租户做准备的。
manageObjectSync是namespace范围的,包括Pod、configmap、secret、service、endpoint的同步

// Compare the namespace scope objects that have been persisted to the edge with the namespace scope objects in K8s,
// and generate update and delete events to the edge
func (sctl *SyncController) manageObjectSync(syncs []*v1alpha1.ObjectSync) {
    for _, sync := range syncs {
        switch sync.Spec.ObjectKind {
        case model.ResourceTypePod:
            sctl.managePod(sync)
        case model.ResourceTypeConfigmap:
            sctl.manageConfigMap(sync)
        case model.ResourceTypeSecret:
            sctl.manageSecret(sync)
        case commonconst.ResourceTypeService:
            sctl.manageService(sync)
        case commonconst.ResourceTypeEndpoints:
            sctl.manageEndpoint(sync)
        // TODO: add device here
        default:
            klog.Errorf("Unsupported object kind: %v", sync.Spec.ObjectKind)
        }
    }
}

managePod

func (sctl *SyncController) managePod(sync *v1alpha1.ObjectSync) {
    pod, err := sctl.podLister.Pods(sync.Namespace).Get(sync.Spec.ObjectName)

    nodeName := getNodeName(sync.Name)

    if err != nil {
        if apierrors.IsNotFound(err) {
            pod = &v1.Pod{
                ObjectMeta: metav1.ObjectMeta{
                    Name:      sync.Spec.ObjectName,
                    Namespace: sync.Namespace,
                    UID:       types.UID(getObjectUID(sync.Name)),
                },
            }
        } else {
            klog.Errorf("Failed to manage pod sync of %s in namespace %s: %v", sync.Name, sync.Namespace, err)
            return
        }
    }
    sendEvents(err, nodeName, sync, model.ResourceTypePod, pod.ResourceVersion, pod)
}

先通过Informer查询pod对象以及node的名字,然后调用sendEvents

sendEvents

func sendEvents(err error, nodeName string, sync *v1alpha1.ObjectSync, resourceType string,
    objectResourceVersion string, obj interface{}) {
    if err != nil && apierrors.IsNotFound(err) {
        //trigger the delete event
        klog.Infof("%s: %s has been deleted in K8s, send the delete event to edge", resourceType, sync.Spec.ObjectName)
        msg := buildEdgeControllerMessage(nodeName, sync.Namespace, resourceType, sync.Spec.ObjectName, model.DeleteOperation, obj)
        beehiveContext.Send(commonconst.DefaultContextSendModuleName, *msg)
        return
    }

    if sync.Status.ObjectResourceVersion == "" {
        klog.Errorf("The ObjectResourceVersion is empty in status of objectsync: %s", sync.Name)
        return
    }

    if CompareResourceVersion(objectResourceVersion, sync.Status.ObjectResourceVersion) > 0 {
        // trigger the update event
        klog.V(4).Infof("The resourceVersion: %s of %s in K8s is greater than in edgenode: %s, send the update event", objectResourceVersion, resourceType, sync.Status.ObjectResourceVersion)
        msg := buildEdgeControllerMessage(nodeName, sync.Namespace, resourceType, sync.Spec.ObjectName, model.UpdateOperation, obj)
        beehiveContext.Send(commonconst.DefaultContextSendModuleName, *msg)
    }
}

这里就是3个判断,

  • 如果对象被删除了,那么向Edge节点发送DeleteOperation(通过cloudhub来发送)
  • 如果对象没有取到版本,则报错,不处理(update resourceversion失败会导致这里是空的)
  • 如果object的版本比已经sync的版本高,,那么发送UpdateOperation操作

manageCreateFailedObject

manageCreateFailedObject又调用了manageCreateFailedCoreObject(device object暂未处理)

manageCreateFailedCoreObject

func (sctl *SyncController) manageCreateFailedCoreObject() {
    allPods, err := sctl.podLister.List(labels.Everything())
    if err != nil {
        klog.Errorf("Filed to list all the pods: %v", err)
        return
    }

    set := labels.Set{edgemgr.NodeRoleKey: edgemgr.NodeRoleValue}
    selector := labels.SelectorFromSet(set)
    allEdgeNodes, err := sctl.nodeLister.List(selector)
    if err != nil {
        klog.Errorf("Filed to list all the edge nodes: %v", err)
        return
    }

    for _, pod := range allPods {
        if !isFromEdgeNode(allEdgeNodes, pod.Spec.NodeName) {
            continue
        }
        // Check whether the pod is successfully persisted to edge
        _, err := sctl.objectSyncLister.ObjectSyncs(pod.Namespace).Get(BuildObjectSyncName(pod.Spec.NodeName, string(pod.UID)))
        if err != nil && apierrors.IsNotFound(err) {
            msg := buildEdgeControllerMessage(pod.Spec.NodeName, pod.Namespace, model.ResourceTypePod, pod.Name, model.InsertOperation, pod)
            beehiveContext.Send(commonconst.DefaultContextSendModuleName, *msg)
        }

        // TODO: add send check for service and endpoint

这里就是先查出要在edge运行的pod,然后看下这个pod是否在ObjectSync中,如果不在,就认为没发送成功,重新向边缘同步一次消息。

小结

这里可以看到,SyncController就是起了一个定时任务,来判断K8S中的资源版本和objectSync中的资源版本,如果K8S中的版本大,则同步到边缘节点。
如果objectSync中没有对象,也同步一次。
不过这里还是有之前说的问题,如果ObjectSync成功了,边缘节点没成功,如何处理。

总结

reliable sync通过将cloud下发给edge的消息持久化在objectsync这个CRD中,来保证消息至少被同步一次到边缘节点。

但是目前整个流程还没完整的实现,从当前的代码看,目前只完成了K8S原生resource在CRD中的保存,ACK机制以及device等CRD的reliable sync还未完成。

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。