KubeEdge分析-reliablesyncs
前言
reliablesyncs是KubeEdge v1.2加入的,主要是用来实现cloudcore的高可用,防止cloudcore故障切换的时候,丢失数据。
design见 https://github.com/kubeedge/kubeedge/blob/master/docs/proposals/reliable-message-delivery.md
从设计文档看,应该是要保证消息的“At-Least-Once”,不过从代码实现上看,目前应该是只完成了一部分,还没实现完整,比如ack机制还没做。
CRD
reliablesyncs引入了2个新的crd:
- ClusterObjectSync
- ObjectSync
从代码中看,目前只实现了ObjectSync,ClusterObjectSync是没有实现的,所以下面主要看ObjectSync
这两个crd的内容比较简单,只有objectType、objectName以及objectResourceVersion3个字段
入口
从代码中看,有几个地方都有一部分reliablesyncs相关的代码,分别在cloudhub、synccontroller下面
cloudhub
cloudhub的start方法初始化了一个新的newObjectSyncController,然后调用WaitForCacheSync方法等到cache sync完成,
Start
newObjectSyncController
func newObjectSyncController() *hubconfig.ObjectSyncController {
config, err := buildConfig()
if err != nil {
klog.Errorf("Failed to build config, err: %v", err)
os.Exit(1)
}
crdClient := versioned.NewForConfigOrDie(config)
crdFactory := crdinformerfactory.NewSharedInformerFactory(crdClient, 0)
clusterObjectSyncInformer := crdFactory.Reliablesyncs().V1alpha1().ClusterObjectSyncs()
objectSyncInformer := crdFactory.Reliablesyncs().V1alpha1().ObjectSyncs()
sc := &hubconfig.ObjectSyncController{
CrdClient: crdClient,
ClusterObjectSyncInformer: clusterObjectSyncInformer,
ObjectSyncInformer: objectSyncInformer,
ClusterObjectSyncSynced: clusterObjectSyncInformer.Informer().HasSynced,
ObjectSyncSynced: objectSyncInformer.Informer().HasSynced,
ClusterObjectSyncLister: clusterObjectSyncInformer.Lister(),
ObjectSyncLister: objectSyncInformer.Lister(),
}
go sc.ClusterObjectSyncInformer.Informer().Run(beehiveContext.Done())
go sc.ObjectSyncInformer.Informer().Run(beehiveContext.Done())
return sc
}
首先初始化了一个crdclient和crdFactory
crdclient是实现了go-client的rest.Interface的一组client的集合(封装到了Clientset这个结构体中)
Clientset包含了devicesV1alpha1、reliablesyncsV1alpha1和DiscoveryClient这三个client。每个client在rest client基础上新加了一些接口;
crdFactory是一个配置了resync时间为0的sharedInformerFactory(为0应该就是不自动做resync,这里也可以理解,毕竟整个object就是用来做sync用的,再让go-client自动做resync就显得有点多余了)
下面这些代码都是client-go生成的,简单看下
${SCRIPT_ROOT}/cloud/hack/generate-groups.sh "deepcopy,client,informer,lister" \
github.com/kubeedge/kubeedge/cloud/pkg/client github.com/kubeedge/kubeedge/cloud/pkg/apis \
"devices:v1alpha1 reliablesyncs:v1alpha1" \
--go-header-file ${SCRIPT_ROOT}/cloud/hack/boilerplate/boilerplate.txt
type sharedInformerFactory struct {
client versioned.Interface
namespace string
tweakListOptions internalinterfaces.TweakListOptionsFunc
lock sync.Mutex
defaultResync time.Duration
customResync map[reflect.Type]time.Duration
informers map[reflect.Type]cache.SharedIndexInformer
// startedInformers is used for tracking which informers have been started.
// This allows Start() to be called multiple times safely.
startedInformers map[reflect.Type]bool
}
这里再看下sharedInformerFactory,这里基本就是go-client定义的标准结构,sharedInformerFactory又实现了Devices()和Reliablesyncs()这两个方法。
Devices()、Reliablesyncs()分别调用了New方法
type version struct {
factory internalinterfaces.SharedInformerFactory
namespace string
tweakListOptions internalinterfaces.TweakListOptionsFunc
}
// New returns a new Interface.
func New(f internalinterfaces.SharedInformerFactory, namespace string, tweakListOptions internalinterfaces.TweakListOptionsFunc) Interface {
return &version{factory: f, namespace: namespace, tweakListOptions: tweakListOptions}
}
// NewInformerFunc takes versioned.Interface and time.Duration to return a SharedIndexInformer.
type NewInformerFunc func(versioned.Interface, time.Duration) cache.SharedIndexInformer
// SharedInformerFactory a small interface to allow for adding an informer without an import cycle
type SharedInformerFactory interface {
Start(stopCh <-chan struct{})
InformerFor(obj runtime.Object, newFunc NewInformerFunc) cache.SharedIndexInformer
}
// TweakListOptionsFunc is a function that transforms a v1.ListOptions.
type TweakListOptionsFunc func(*v1.ListOptions)
这里流程还是比较长的,这里先记录下,暂时先不仔细看了。
接着初始化了clusterObjectSyncInformer和objectSyncInformer这两个Informer
// Interface provides access to all the informers in this group version.
type Interface interface {
// ClusterObjectSyncs returns a ClusterObjectSyncInformer.
ClusterObjectSyncs() ClusterObjectSyncInformer
// ObjectSyncs returns a ObjectSyncInformer.
ObjectSyncs() ObjectSyncInformer
}
type version struct {
factory internalinterfaces.SharedInformerFactory
namespace string
tweakListOptions internalinterfaces.TweakListOptionsFunc
}
// New returns a new Interface.
func New(f internalinterfaces.SharedInformerFactory, namespace string, tweakListOptions internalinterfaces.TweakListOptionsFunc) Interface {
return &version{factory: f, namespace: namespace, tweakListOptions: tweakListOptions}
}
// ClusterObjectSyncs returns a ClusterObjectSyncInformer.
func (v *version) ClusterObjectSyncs() ClusterObjectSyncInformer {
return &clusterObjectSyncInformer{factory: v.factory, tweakListOptions: v.tweakListOptions}
}
// ObjectSyncs returns a ObjectSyncInformer.
func (v *version) ObjectSyncs() ObjectSyncInformer {
return &objectSyncInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions}
}
上面这些方法应该都是client-go生成的。
接着把crdClient、ClusterObjectSyncInformer、objectSyncInformer等都保存到ObjectSyncController这个对象中,以便后续使用。
然后调用了Informer().Run方法,这里详细看下
Informer().Run
Informer()返回的是cache.SharedIndexInformer对象,这个对象是定义在client-go中的,run方法就是启动这个informer。
run方法的结束条件是beehiveContext.Done()
WaitForCacheSync
回到Start方法中,调用cache.WaitForCacheSync方法
// WaitForCacheSync waits for caches to populate. It returns true if it was successful, false
// if the controller should shutdown
// callers should prefer WaitForNamedCacheSync()
func WaitForCacheSync(stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool {
err := wait.PollImmediateUntil(syncedPollPeriod,
func() (bool, error) {
for _, syncFunc := range cacheSyncs {
if !syncFunc() {
return false, nil
}
}
return true, nil
},
stopCh)
if err != nil {
klog.V(2).Infof("stop requested")
return false
}
klog.V(4).Infof("caches populated")
return true
}
这里实际上就是分别调用参数列表中传入的方法ClusterObjectSyncSynced和ObjectSyncSynced,直到这些方法都返回true。
而ClusterObjectSyncSynced在之前的newObjectSyncController中被初始化为clusterObjectSyncInformer.Informer().HasSynced
func (f *clusterObjectSyncInformer) Informer() cache.SharedIndexInformer {
return f.factory.InformerFor(&reliablesyncsv1alpha1.ClusterObjectSync{}, f.defaultInformer)
}
所以这里最终调用到的是internalinterfaces.SharedInformerFactory的InformerFor方法。
// InternalInformerFor returns the SharedIndexInformer for obj using an internal
// client.
func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
f.lock.Lock()
defer f.lock.Unlock()
informerType := reflect.TypeOf(obj)
informer, exists := f.informers[informerType]
if exists {
return informer
}
resyncPeriod, exists := f.customResync[informerType]
if !exists {
resyncPeriod = f.defaultResync
}
informer = newFunc(f.client, resyncPeriod)
f.informers[informerType] = informer
return informer
}
这里是理解整个Informer初始化的关键了,InformerFor方法传入了2个参数,一个是要被watch的对象,另一个是创建informer的方法。
比如这里要被watch的是ClusterObjectSync对象,NewInformerFunc传入的是defaultInformer
// NewFilteredClusterObjectSyncInformer constructs a new informer for ClusterObjectSync type.
// Always prefer using an informer factory to get a shared informer instead of getting an independent
// one. This reduces memory footprint and number of connections to the server.
func NewFilteredClusterObjectSyncInformer(client versioned.Interface, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
return cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options v1.ListOptions) (runtime.Object, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
return client.ReliablesyncsV1alpha1().ClusterObjectSyncs().List(options)
},
WatchFunc: func(options v1.ListOptions) (watch.Interface, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
return client.ReliablesyncsV1alpha1().ClusterObjectSyncs().Watch(options)
},
},
&reliablesyncsv1alpha1.ClusterObjectSync{},
resyncPeriod,
indexers,
)
}
这里可以看到创建NewSharedIndexInformer主要就是创建ListFunc和WatchFunc两个方法,而这两个方法又调用了client.DevicesV1alpha1().Devices(namespace).List(options)方法。
再回顾一下,WaitForCacheSync-->ClusterObjectSyncSynced-->clusterObjectSyncInformer.Informer().HasSynced-->f.factory.InformerFor(&reliablesyncsv1alpha1.ClusterObjectSync{}, f.defaultInformer)-->defaultInformer-->NewFilteredClusterObjectSyncInformer
所以对ClusterObjectSyncSynced来说,实际是对ClusterObjectSyncs进行watch。
对ObjectSyncSynced来说,则对ObjectSync进行watch。
WaitForCacheSync-->ObjectSyncSynced-->objectSyncInformer.Informer().HasSynced-->f.factory.InformerFor(&reliablesyncsv1alpha1.ObjectSync{}, f.defaultInformer)-->defaultInformer-->NewFilteredObjectSyncInformer
回到Start中
所以在cloudhub的start中,其实就是对ClusterObjectSync和ObjectSync这两类对象进行同步。
接着又初始化了一个ChannelMessageQueue对象,这个对象中保存了ObjectSyncController对象(这个操作有点奇怪,后面再分析他的用途)
然后调用ChannelMessageQueue的DispatchMessage方法(这个方法本身是用于将cloud的消息分发给edge的,这里多了一个ObjectSyncController对象以后就要看下他在消息分发过程中的作用)
DispatchMessage
DispatchMessage-->addMessageToQueue-->BuildObjectSyncName
if !isDeleteMessage(msg) {
// If the message doesn't exist in the store, then compare it with
// the version stored in the database
if !exist {
resourceNamespace, _ := edgemessagelayer.GetNamespace(*msg)
resourceUID, err := GetMessageUID(*msg)
if err != nil {
klog.Errorf("fail to get message UID for message: %s", msg.Header.ID)
return
}
objectSync, err := q.ObjectSyncController.ObjectSyncLister.ObjectSyncs(resourceNamespace).Get(synccontroller.BuildObjectSyncName(nodeID, resourceUID))
if err == nil && msg.GetResourceVersion() <= objectSync.ResourceVersion {
return
}
}
这里终于看到了ObjectSyncController的出现,这里实际就是对比了一下objectSync对象的版本和msg中的版本,如果objectsync中的版本更高,这里就不处理了(说明已经同步了更高的版本)
saveSuccessPoint
在启动websocket server的时候,注册了新连接的回调方法OnRegister,
// OnRegister register node on first connection
func (mh *MessageHandle) OnRegister(connection conn.Connection) {
nodeID := connection.ConnectionState().Headers.Get("node_id")
projectID := connection.ConnectionState().Headers.Get("project_id")
if _, ok := mh.KeepaliveChannel[nodeID]; !ok {
mh.KeepaliveChannel[nodeID] = make(chan struct{}, 1)
}
io := &hubio.JSONIO{Connection: connection}
go mh.ServeConn(io, &model.HubInfo{ProjectID: projectID, NodeID: nodeID})
}
OnRegister从连接信息中取出node_id和project_id相关信息,然后调用ServeConn方法。
ServeConn-->handler->(InitHandler)-->ListMessageWriteLoop-->handleMessage-->sendMsg-->saveSuccessPoint
|
|->MessageWriteLoop-->
ServeConn最终调用到了saveSuccessPoint方法,
// MessageWriteLoop processes all write requests
func (mh *MessageHandle) MessageWriteLoop(hi hubio.CloudHubIO, info *model.HubInfo, stopServe chan ExitCode, stopSendMsg chan struct{}) {
nodeQueue, err := mh.MessageQueue.GetNodeQueue(info.NodeID)
if err != nil {
klog.Errorf("Failed to get nodeQueue for node %s: %v", info.NodeID, err)
stopServe <- messageQueueDisconnect
return
}
nodeStore, err := mh.MessageQueue.GetNodeStore(info.NodeID)
if err != nil {
klog.Errorf("Failed to get nodeStore for node %s: %v", info.NodeID, err)
stopServe <- messageQueueDisconnect
return
}
for {
select {
case <-stopSendMsg:
klog.Errorf("Node %s disconnected and stopped sending messages", info.NodeID)
return
default:
mh.handleMessage(nodeQueue, nodeStore, hi, info, stopServe, "message")
}
}
}
HubInfo只包含了projectID和nodeID两个信息,MessageWriteLoop根据nodeid,从MessageQueue中取出缓存的消息,然后交给mh.handleMessage处理。
要注意这里的消息应该是只有云发送给边的消息(只有DispatchMessage方法网MessageQueue中写数据)。
func (mh *MessageHandle) saveSuccessPoint(msg *beehiveModel.Message, info *model.HubInfo, nodeStore cache.Store) {
if msg.GetGroup() == edgeconst.GroupResource {
resourceNamespace, _ := edgemessagelayer.GetNamespace(*msg)
resourceName, _ := edgemessagelayer.GetResourceName(*msg)
resourceType, _ := edgemessagelayer.GetResourceType(*msg)
resourceUID, err := channelq.GetMessageUID(*msg)
if err != nil {
return
}
objectSyncName := synccontroller.BuildObjectSyncName(info.NodeID, resourceUID)
if msg.GetOperation() == beehiveModel.DeleteOperation {
nodeStore.Delete(msg)
mh.deleteSuccessPoint(resourceNamespace, objectSyncName)
return
}
objectSync, err := mh.MessageQueue.ObjectSyncController.CrdClient.ReliablesyncsV1alpha1().ObjectSyncs(resourceNamespace).Get(objectSyncName, metav1.GetOptions{})
if err == nil {
objectSync.Status.ObjectResourceVersion = msg.GetResourceVersion()
_, err := mh.MessageQueue.ObjectSyncController.CrdClient.ReliablesyncsV1alpha1().ObjectSyncs(resourceNamespace).UpdateStatus(objectSync)
if err != nil {
klog.Errorf("Failed to update objectSync: %v, resourceType: %s, resourceNamespace: %s, resourceName: %s",
err, resourceType, resourceNamespace, resourceName)
}
} else if err != nil && apierrors.IsNotFound(err) {
objectSync := &v1alpha1.ObjectSync{
ObjectMeta: metav1.ObjectMeta{
Name: objectSyncName,
},
Spec: v1alpha1.ObjectSyncSpec{
ObjectAPIVersion: "",
ObjectKind: resourceType,
ObjectName: resourceName,
},
}
_, err := mh.MessageQueue.ObjectSyncController.CrdClient.ReliablesyncsV1alpha1().ObjectSyncs(resourceNamespace).Create(objectSync)
if err != nil {
klog.Errorf("Failed to create objectSync: %s, err: %v", objectSyncName, err)
return
}
objectSyncStatus, err := mh.MessageQueue.ObjectSyncController.CrdClient.ReliablesyncsV1alpha1().ObjectSyncs(resourceNamespace).Get(objectSyncName, metav1.GetOptions{})
if err != nil {
klog.Errorf("Failed to get objectSync: %s, err: %v", objectSyncName, err)
}
objectSyncStatus.Status.ObjectResourceVersion = msg.GetResourceVersion()
mh.MessageQueue.ObjectSyncController.CrdClient.ReliablesyncsV1alpha1().ObjectSyncs(resourceNamespace).UpdateStatus(objectSyncStatus)
}
}
// TODO: save device info
if msg.GetGroup() == deviceconst.GroupTwin {
}
klog.Infof("saveSuccessPoint successfully for message: %s", msg.GetResource())
}
从之前的分析看,saveSuccessPoint也是只用在云端往边缘发消息这个场景下。
首先要判断msg的Group,对于K8S本身的资源,Group是"resource",也就是这里要处理的。
对于device相关的msg,group是"twin",目前还没处理。
然后根据NodeID和resourceUID创建一个objectSyncName,这个objectSyncName是各类和objectSync相关的存储的主键。
对于删除操作,从nodeStore中删除msg,并删除以objectSyncName为key的SuccessPoint
然后以objectSyncName为key向k8s查询Object对象
- 如果可以查到,则更新ObjectSync对象
- 如果查询不到,则创建一个新的ObjectSync对象
新的ObjectSync对象包含了objectSyncName、resourceType、resourceName。创建成功后,再根据名字查询出objectSync对象,然后将objectSyncStatus.Status.ObjectResourceVersion更新为msg中的resourceVersion(这里为啥要分两步,而不是在创建的时候就把ResourceVersion放进去?)
小结
从整个流程看,ObjectSync就是将云端需要发送给边缘端的K8S原生资源的操作,保存到ObjectSync对象中(ObjectSync对象本身也是存到K8S中的)
在DispatchMessage的时候,首先判断objectSync对象的版本和msg中的版本,如果objectsync中的版本更高,这个操作就会被丢弃,因为已经同步了更高的版本。
这里需要注意的是,上述方式只能保证云发送到边不会出现重复发送或者乱序发送,但并不能保证这个操作被发送到边缘,并被正确执行。(这里不确定是否会有bug,比如objectsync中保存了操作,但是edge节点在这个时候down了,那么从现在的逻辑看,如果没有后续对这个资源的操作,那这个资源在edge节点恢复后,也不会被同步)
synccontroller
Register
Register方法中,创建了一个synccontroller
func Register(ec *configv1alpha1.SyncController, kubeAPIConfig *configv1alpha1.KubeAPIConfig) {
config.InitConfigure(ec, kubeAPIConfig)
core.Register(newSyncController(ec.Enable))
}
func newSyncController(enable bool) *SyncController {
config, err := buildConfig()
if err != nil {
klog.Errorf("Failed to build config, err: %v", err)
os.Exit(1)
}
kubeClient := kubernetes.NewForConfigOrDie(config)
crdClient := versioned.NewForConfigOrDie(config)
kubeSharedInformers := informers.NewSharedInformerFactory(kubeClient, 0)
crdFactory := crdinformerfactory.NewSharedInformerFactory(crdClient, 0)
podInformer := kubeSharedInformers.Core().V1().Pods()
configMapInformer := kubeSharedInformers.Core().V1().ConfigMaps()
...
sctl := &SyncController{
enable: enable,
podInformer: podInformer,
configMapInformer: configMapInformer,
...
podSynced: podInformer.Informer().HasSynced,
configMapSynced: configMapInformer.Informer().HasSynced,
...
podLister: podInformer.Lister(),
configMapLister: configMapInformer.Lister(),
...
}
return sctl
}
newSyncController初始化了一堆informer,并将它们保存了起来。
Start
start方法分别让各种informer都run起来,并等他们完成同步。接着启动一个go协程
go wait.Until(sctl.reconcile, 5*time.Second, beehiveContext.Done())
这里就是每5秒运行一次reconcile()方法直到整个程序退出。
reconcile
func (sctl *SyncController) reconcile() {
allClusterObjectSyncs, err := sctl.clusterObjectSyncLister.List(labels.Everything())
if err != nil {
klog.Errorf("Filed to list all the ClusterObjectSyncs: %v", err)
}
sctl.manageClusterObjectSync(allClusterObjectSyncs)
allObjectSyncs, err := sctl.objectSyncLister.List(labels.Everything())
if err != nil {
klog.Errorf("Filed to list all the ObjectSyncs: %v", err)
}
sctl.manageObjectSync(allObjectSyncs)
sctl.manageCreateFailedObject()
}
从代码中看,manageClusterObjectSync是空的方法,应该是为了以后多租户做准备的。
manageObjectSync是namespace范围的,包括Pod、configmap、secret、service、endpoint的同步
// Compare the namespace scope objects that have been persisted to the edge with the namespace scope objects in K8s,
// and generate update and delete events to the edge
func (sctl *SyncController) manageObjectSync(syncs []*v1alpha1.ObjectSync) {
for _, sync := range syncs {
switch sync.Spec.ObjectKind {
case model.ResourceTypePod:
sctl.managePod(sync)
case model.ResourceTypeConfigmap:
sctl.manageConfigMap(sync)
case model.ResourceTypeSecret:
sctl.manageSecret(sync)
case commonconst.ResourceTypeService:
sctl.manageService(sync)
case commonconst.ResourceTypeEndpoints:
sctl.manageEndpoint(sync)
// TODO: add device here
default:
klog.Errorf("Unsupported object kind: %v", sync.Spec.ObjectKind)
}
}
}
managePod
func (sctl *SyncController) managePod(sync *v1alpha1.ObjectSync) {
pod, err := sctl.podLister.Pods(sync.Namespace).Get(sync.Spec.ObjectName)
nodeName := getNodeName(sync.Name)
if err != nil {
if apierrors.IsNotFound(err) {
pod = &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: sync.Spec.ObjectName,
Namespace: sync.Namespace,
UID: types.UID(getObjectUID(sync.Name)),
},
}
} else {
klog.Errorf("Failed to manage pod sync of %s in namespace %s: %v", sync.Name, sync.Namespace, err)
return
}
}
sendEvents(err, nodeName, sync, model.ResourceTypePod, pod.ResourceVersion, pod)
}
先通过Informer查询pod对象以及node的名字,然后调用sendEvents
sendEvents
func sendEvents(err error, nodeName string, sync *v1alpha1.ObjectSync, resourceType string,
objectResourceVersion string, obj interface{}) {
if err != nil && apierrors.IsNotFound(err) {
//trigger the delete event
klog.Infof("%s: %s has been deleted in K8s, send the delete event to edge", resourceType, sync.Spec.ObjectName)
msg := buildEdgeControllerMessage(nodeName, sync.Namespace, resourceType, sync.Spec.ObjectName, model.DeleteOperation, obj)
beehiveContext.Send(commonconst.DefaultContextSendModuleName, *msg)
return
}
if sync.Status.ObjectResourceVersion == "" {
klog.Errorf("The ObjectResourceVersion is empty in status of objectsync: %s", sync.Name)
return
}
if CompareResourceVersion(objectResourceVersion, sync.Status.ObjectResourceVersion) > 0 {
// trigger the update event
klog.V(4).Infof("The resourceVersion: %s of %s in K8s is greater than in edgenode: %s, send the update event", objectResourceVersion, resourceType, sync.Status.ObjectResourceVersion)
msg := buildEdgeControllerMessage(nodeName, sync.Namespace, resourceType, sync.Spec.ObjectName, model.UpdateOperation, obj)
beehiveContext.Send(commonconst.DefaultContextSendModuleName, *msg)
}
}
这里就是3个判断,
- 如果对象被删除了,那么向Edge节点发送DeleteOperation(通过cloudhub来发送)
- 如果对象没有取到版本,则报错,不处理(update resourceversion失败会导致这里是空的)
- 如果object的版本比已经sync的版本高,,那么发送UpdateOperation操作
manageCreateFailedObject
manageCreateFailedObject又调用了manageCreateFailedCoreObject(device object暂未处理)
manageCreateFailedCoreObject
func (sctl *SyncController) manageCreateFailedCoreObject() {
allPods, err := sctl.podLister.List(labels.Everything())
if err != nil {
klog.Errorf("Filed to list all the pods: %v", err)
return
}
set := labels.Set{edgemgr.NodeRoleKey: edgemgr.NodeRoleValue}
selector := labels.SelectorFromSet(set)
allEdgeNodes, err := sctl.nodeLister.List(selector)
if err != nil {
klog.Errorf("Filed to list all the edge nodes: %v", err)
return
}
for _, pod := range allPods {
if !isFromEdgeNode(allEdgeNodes, pod.Spec.NodeName) {
continue
}
// Check whether the pod is successfully persisted to edge
_, err := sctl.objectSyncLister.ObjectSyncs(pod.Namespace).Get(BuildObjectSyncName(pod.Spec.NodeName, string(pod.UID)))
if err != nil && apierrors.IsNotFound(err) {
msg := buildEdgeControllerMessage(pod.Spec.NodeName, pod.Namespace, model.ResourceTypePod, pod.Name, model.InsertOperation, pod)
beehiveContext.Send(commonconst.DefaultContextSendModuleName, *msg)
}
// TODO: add send check for service and endpoint
这里就是先查出要在edge运行的pod,然后看下这个pod是否在ObjectSync中,如果不在,就认为没发送成功,重新向边缘同步一次消息。
小结
这里可以看到,SyncController就是起了一个定时任务,来判断K8S中的资源版本和objectSync中的资源版本,如果K8S中的版本大,则同步到边缘节点。
如果objectSync中没有对象,也同步一次。
不过这里还是有之前说的问题,如果ObjectSync成功了,边缘节点没成功,如何处理。
总结
reliable sync通过将cloud下发给edge的消息持久化在objectsync这个CRD中,来保证消息至少被同步一次到边缘节点。
但是目前整个流程还没完整的实现,从当前的代码看,目前只完成了K8S原生resource在CRD中的保存,ACK机制以及device等CRD的reliable sync还未完成。