编写一个自定义K8s Controller

在 K8s 中当我们需要监控某个资源的变化并作一系列操作时,使用 K8s 提供的 controller 机制来实现,同时 K8s 官方提供了一个通用库 client-go,通过它可以很容易实现自定义controller.

Client-go & controller 架构

在编写controller 之前,我们需要了解 client-go 对资源监控的整个架构和流程,并需要知道我们所需要自定义的是哪部分组件。

在 client-go 中包含了编写自定义 controller 可以使用的各种机制,这些机制在库的k8s.io/client-go/tools k8s.io/client-go/util中定义。

image

如图所示,图中包含了 client-go 和 controller 整个交互流程。

client-go

在client-go 中主要包含了以下组件:

  • Reflector:通过调用 LISTWATCH 接口与 ApiServer 通信,监听特定资源(此处监听的资源需要我们指定),并把资源的更新动态存入 Delta FIFO 队列
  • Informer:从Delta FIFO队列拿出对象,完成此操作的函数是processLoop。
  • Indexer: 提供线程级别安全来存储对象和key。

custom controller

  • Informer reference: Informer对象引用
  • Indexer reference: Indexer对象引用
  • Resource Event Handlers: 被Informer调用的回调函数,这些函数的作用通常是获取对象的key,并把key放入Work queue,以进一步做处理。
  • Work queue: 工作队列,用于将对象的交付与其处理分离,编写Resource event handler functions以提取传递的对象的key并将其添加到工作队列。此处可以过滤掉我们不关心的信息。
  • Process Item: 用于处理Work queue中的对象,可以有一个或多个其他函数一起处理;这些函数通常使用Indexer reference或Listing wrapper来检索与该键对应的对象。这里就是我们需要自定义的业务逻辑

Sample Controller

这里编写一个简易的 Controller, 用于监听 pod 创建、删除信息,并将信息打印出来。

Controller 逻辑

首先我们需要定义一个这样的 Controller 结构体

type Controller struct {
    indexer  cache.Indexer  // Indexer 的引用
    queue    workqueue.RateLimitingInterface  //workqueue 的引用
    informer cache.Controller // Informer 的引用
}

定义 Controller 的工作流

func (c *Controller) Run(threadiness int, stopCh chan struct{}) {
    defer runtime.HandleCrash()

    defer c.queue.ShutDown()

    klog.Info("Starting pod controller")

    go c.informer.Run(stopCh)   // 启动 informer

    if !cache.WaitForCacheSync(stopCh, c.informer.HasSynced) {
        runtime.HandleError(fmt.Errorf("Time out waitng for caches to sync"))
        return
    }

    // 启动多个 worker 处理 workqueue 中的对象
    for i := 0; i < threadiness; i++ {
        go wait.Until(c.runWorker, time.Second, stopCh)
    }

    <-stopCh
    klog.Info("Stopping Pod controller")
}

具体处理 worker queue 中对象的流程

func (c *Controller) runWorker() {
    // 启动无限循环,接收并处理消息
    for c.processNextItem() {

    }
}
// 从 workqueue 中获取对象,并打印信息。
func (c *Controller) processNextItem() bool {
    key, shutdown := c.queue.Get()
    // 退出
    if shutdown {
        return false
    }

    // 标记此key已经处理
    defer c.queue.Done(key)

    // 将key对应的 object 的信息进行打印
    err := c.syncToStdout(key.(string))

    c.handleError(err, key)
    return true
}

// 获取 key 对应的 object,并打印相关信息
func (c *Controller) syncToStdout(key string) error {
    obj, exists, err := c.indexer.GetByKey(key)
    if err != nil {
        klog.Errorf("Fetching object with key %s from store failed with %v", key, err)
        return err
    }
    if !exists {
        fmt.Printf("Pod %s does not exist")
    } else {
        fmt.Printf("Sync/Add/Update for Pod %s\n", obj.(*core_v1.Pod).GetName())
    }
    return nil
}

Main 函数逻辑

func main() {
    var kubeconfig string
    var master string
    // 从外部获取集群信息(kube.config)
    flag.StringVar(&kubeconfig, "kubeconfig", "", "kubeconfig file")
    // 获取集群master 的url
    flag.StringVar(&master, "master", "", "master url")

    // 读取构建 config
    config, err := clientcmd.BuildConfigFromFlags(master, kubeconfig)
    if err != nil {
        klog.Fatal(err)
    }

    // 创建 k8s client
    clientset, err := kubernetes.NewForConfig(config)
    if err != nil {
        klog.Fatal(err)
    }

    // 指定 ListWatcher 在所有namespace下监听 pod 资源
    podListWatcher := cache.NewListWatchFromClient(clientset.CoreV1().RESTClient(), "pods", v1.NamespaceAll, fields.Everything())

    // 创建 workqueue
    queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())

    // 创建 indexer 和 informer
    indexer, informer := cache.NewIndexerInformer(podListWatcher, &v1.Pod{}, 0, cache.ResourceEventHandlerFuncs{
        // 当有 pod 创建时,根据 Delta queue 弹出的 object 生成对应的Key,并加入到 workqueue中。此处可以根据Object的一些属性,进行过滤
        AddFunc: func(obj interface{}) {
            key, err := cache.MetaNamespaceKeyFunc(new)
            if err == nil {
                queue.Add(key)
            }
        },
        // pod 删除操作
        DeleteFunc: func(obj interface{}) {
            // DeletionHandlingMetaNamespaceKeyFunc 会在生成key 之前检查。因为资源删除后有可能会进行重建等操作,监听时错过了删除信息,从而导致该条记录是陈旧的。
            key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
            if err == nil {
                queue.Add(key)
            }
        },
    }, cache.Indexers{})

    controller := pkg.NewController(queue, indexer, informer)

    stop := make(chan struct{})

    defer close(stop)
    // 启动 controller
    go controller.Run(1, stop)

    select {}

}

后记

Controller 的整体流程综上,如果我们使用 CRD ,Controller 流程也不会变化很多,更多的是需要对CRD 的监控,并根据变化创建对应的 Deployment 或 StatefulSet。同时 CRD 也需要增加对应的 Validation 和 Spec 的解析。这一部分我们下一篇继续。

To Be Continue...

如果喜欢,请关注我的公众号,或者查看我的博客 http://packyzbq.gitee.io. 我会不定时的发送我自己的学习记录,大家互相学习交流哈~

weixin

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,686评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,668评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,160评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,736评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,847评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,043评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,129评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,872评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,318评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,645评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,777评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,470评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,126评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,861评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,095评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,589评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,687评论 2 351

推荐阅读更多精彩内容

  • 一、kubernetes 集群的几种访问方式 在实际开发过程中,若想要获取 kubernetes 中某个资源(比如...
    田飞雨阅读 11,807评论 4 23
  • 一、Customer Resource 自定义资源是Kubernetes API的扩展,本文将讨什么时候应该向Ku...
    昔召阆梦阅读 1,240评论 0 0
  • 今天下午放学,我就和儿子去姥姥家送粽子了。去了就去看姥姥喂的小鸡,然后看了一会儿电视,天快要黑了,我们就回家了。
    泰康_b56f阅读 54评论 0 0
  • 今天是2018年12月30号,元旦假期的第一天,这一年的倒数第二天,迎来了武汉的初雪。 早上醒来就被朋友圈的雪景刷...
    菜菜___阅读 399评论 2 4
  • 夜, 你有眼睛吗 世上所有灵魂,肉体 与你相拥 许你那冷寂的温暖 你看到了吗 一双坚毅的双眼 静默的与你对望 她...
    JessieLWW阅读 260评论 0 0