实现一个k8s Admission Webhook

引言

在实际的 kubernetes 使用场景下,我们可能需要对提交到集群的任务进行自定义的准入控制,例如对特定的任务添加指定的label 或者 annotations,再比如需要限制提交任务的资源配比,亦或是为了实现安全管控从而拒绝特权容器等等。
我们不妨考虑 kubernetes 的准入控制特性来实现我们的需求。

Admission Webhook

Admission webhook 是k8s apiserver对外提供的一种扩展能力,在 kubernetes apiserver 中包含两个特殊的准入控制器: MutatingAdmissionWebhookValidatingAdmissionWebhook 。这两个控制器将发送准入请求到外部的 HTTP 回调服务并接收一个准入响应。通过自己实现这个外部的 http 回调服务,我们可以实现对k8s原生或者自定义控制器的行为控制,比如你想阻止某个控制器的行为,或拦截某个控制器的资源修改等。

实现步骤

本文以 mutatingwebhook为例

定义一个mutatingwebhookconfiguration 资源

apiVersion: admissionregistration.k8s.io/v1
kind: MutatingWebhookConfiguration
metadata:
  name: mutating-demo
  annotations:
    cert-manager.io/inject-ca-from: default/mutating-demo
webhooks:
  - admissionReviewVersions: # admissionReviewVersions 请求的版本
    - v1beta1
    - v1
    clientConfig: # 客户端配置
      caBundle: # ca证书, 用于 apiserver 访问 webhook 服务时携带,本文我们通过 cert-manager 的cainjector来实现自动注入,可以不需要此字段
      service: # 调用服务相关配置,这里是一个k8s的service,访问地址是<name>.<namespace>.svc:<port>/<path>
        name: mutating-demo
        namespace: default
        path: /mutating-demo
        port: 8003
    name: mutating-demo.example.com # webhook名称
    failurePolicy: Fail  # 调用失败策略,Ignore为忽略错误, failed表示admission会处理错误
    matchPolicy: Equivalent
    rules: # 规则
      - apiGroups:
          - apps
        apiVersions:
          - v1
        operations:
          - CREATE
          - UPDATE
        resources:
          - deployments
        scope: '*' # 匹配范围,"*" 匹配所有资源,但不包括子资源,"*/*" 匹配所有资源,包括子资源
    sideEffects: None  # 这个表示webhook是否存在副作用,主要针对 dryRun 的请求
    timeoutSeconds: 10
  # reinvocationPolicy表示再调度策略,因为webhook本身没有顺序性,因此每个修改后可能又被其他webhook修改,所以提供
  # 一个策略表示是否需要被多次调用,Never 表示只会调度一次,IfNeeded 表示资源被修改后会再调度这个webhook
    reinvocationPolicy: Never 

Webhook 执行流程

Admission Webhook 本质是一个 apiserver 的 webhook 调用,如下是 apiserver 的处理流程:


image.png

apiserver 通过读取 mutatingwebhookconfigurationvalidatingwebhookconfiguration 资源文件的目标地址,然后回调我们自定义的服务。

                                            ┌──────────────────────────────────┐
             ┌─────────────────┐            │                                  │
    apply    │                 │    read    │  validatingwebhookconfiguration  │
────────────►│    api-server   │◄───────────┤                                  │
             │                 │            │  mutatingwebhookconfiguration    │
             └────────┬────────┘            │                                  │
                      │                     └──────────────────────────────────┘
                      │
                      │  回调
                      │
                      │
             ┌────────▼────────┐
             │                 │
             │  webhookservice │
             │                 │
             └─────────────────┘

api-server 发起的请求是一串json数据格式,header需要设置content-type为application/json, 我们看看请求的 body :

curl -X POST \
  http://webhook-url \
  -H 'content-type: application/json' \
  -d '{
  "apiVersion": "admission.k8s.io/v1",
  "kind": "AdmissionReview",
  "request": {
    ...
  }
}'

返回给 apiserver 的结果:

{
    "kind": "AdmissionReview",
    "apiVersion": "admission.k8s.io/v1",
    "response": {
        "uid": "b955fb34-0135-4e78-908e-eeb2f874933f",
        "allowed": true,
        "status": {
            "metadata": {},
            "code": 200
        },
        "patch": "W3sib3AiOiJyZXBsYWNlIiwicGF0aCI6Ii9zcGVjL3JlcGxpY2FzIiwidmFsdWUiOjJ9XQ==",
        "patchType": "JSONPatch"
    }
}

这里的 patch 是用 base64 编码的一个 json,我们解码看下结果:

$ echo "W3sib3AiOiJyZXBsYWNlIiwicGF0aCI6Ii9zcGVjL3JlcGxpY2FzIiwidmFsdWUiOjJ9XQ==" | base64 -d
[
    {
        "op": "replace",
        "path": "/spec/replicas",
        "value": 2
    }
]

编写 webhook 服务

本次我们约定实现以下逻辑:
针对 deployment 资源

  1. replicas 副本数最大为 3,如果超过 3 副本,则强制修改为 3。
  2. 所有新增或更新的 deployment 资源,添加 env-type=test的 annotation。
package main

import (
    "encoding/json"
    "flag"
    "fmt"
    "io"
    "k8s.io/apimachinery/pkg/runtime"
    "k8s.io/apimachinery/pkg/runtime/serializer"
    "net/http"

    "github.com/gin-gonic/gin"
    log "github.com/sirupsen/logrus"
    admissionv1 "k8s.io/api/admission/v1"
    appsv1 "k8s.io/api/apps/v1"
)

var certFile string
var keyFile string

var (
    scheme = runtime.NewScheme()
    codecs = serializer.NewCodecFactory(scheme)
)

func main() {
    flag.StringVar(&certFile, "certFile", "", "the webhook server certFile")
    flag.StringVar(&keyFile, "keyFile", "", "the webhook server keyFile")
    flag.Parse()

    router := gin.Default()
    router.POST("/mutating-demo", mutatingDeployment)

    // 启动HTTPS服务器
    err := router.RunTLS(":8003", certFile, keyFile)
    if err != nil {
        log.Fatal("Failed to start HTTPS server: ", err)
    }
}

func mutatingDeployment(c *gin.Context) {
    body, err := io.ReadAll(c.Request.Body)
    if err != nil {
        log.Errorf("Failed to read request body: %v", err)
        c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to read request body"})
        return
    }

    //解析 AdmissionReview 对象,也可以用Golang原生的JSON序列化器(encoding/json包),有兴趣网上可以查一下两种编解码器的区别
    admissionReview := &admissionv1.AdmissionReview{}
    if _,_,err := codecs.UniversalDeserializer().Decode(body,nil, admissionReview);err != nil{
            log.Errorf("Failed to parse AdmissionReview: %v", err)
            c.JSON(http.StatusBadRequest, gin.H{"error": "Failed to parse AdmissionReview"})
            return
    }

    // 获取 deployment 对象
    deployment := &appsv1.Deployment{}
    rawObject := admissionReview.Request.Object.Raw
    if _,_,err := codecs.UniversalDeserializer().Decode(rawObject,nil ,deployment); err != nil {
        c.JSON(http.StatusBadRequest, gin.H{"error": "Failed to parse Deployment"})
        return
    }
    // 任务 1:给所有 deployment 资源加上 env-type=test 的 annotation
    annotationPatch,err := setAnnotationPatch(deployment)
    if err != nil{
        log.Errorf("Failed to set Deployment annotations: %v", err)
        c.JSON(http.StatusBadRequest, gin.H{"error": "Failed to set Deployment annotations"})
        return
    }

    // 任务 2:验证 deployment 的副本数,如果大于 3 则修改为 3, 也就是副本数最大不能超过 3。
    if *deployment.Spec.Replicas > 3 {
        // 修改副本数为 1
        deployment.Spec.Replicas = new(int32)
        *deployment.Spec.Replicas = 3

        deploymentPatch := setReplicasPatch(deployment)
        // 构建允许相应
        finalPatch, err := mergePatches(deploymentPatch,annotationPatch)
        if err != nil{
            log.Errorf("merge patch failed: %v", err)
            c.JSON(http.StatusBadRequest, gin.H{"error": "merge patch failed"})
            return
        }

        admissionReview.Response = &admissionv1.AdmissionResponse{
            Allowed: true,
            Patch:   finalPatch,
            PatchType: func() *admissionv1.PatchType {
                pt := admissionv1.PatchTypeJSONPatch
                return &pt
            }(),
        }
    } else {
        // 构建允许响应
        admissionReview.Response = &admissionv1.AdmissionResponse{
            Allowed: true,
            Patch:   annotationPatch,
            PatchType: func() *admissionv1.PatchType {
                pt := admissionv1.PatchTypeJSONPatch
                return &pt
            }(),
        }
    }
    // 设置相应 uid 和 api 版本
    admissionReview.Response.UID = admissionReview.Request.UID

    // 构建 AdmissionReview 响应
    responseBody, err := json.Marshal(admissionReview)
    if err != nil {
        c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to marshal AdmissionReview response"})
        return
    }

    // 发送 AdmissionReview 响应
    c.Data(http.StatusOK, "application/json", responseBody)
}

func setReplicasPatch(deployment *appsv1.Deployment) []byte {
    patch := []map[string]interface{}{
        {
            "op":    "replace",
            "path":  "/spec/replicas",
            "value": *deployment.Spec.Replicas,
        },
    }
    patchBytes, err := json.Marshal(patch)
    if err != nil {
        log.Error("Failed to marshal patch:", err)
    }
    return patchBytes
}

func setAnnotationPatch(deployment *appsv1.Deployment) ([]byte, error) {
    annotationKey := "env-type"
    annotationValue := "test"

    annotations := deployment.ObjectMeta.GetAnnotations()
    _, exists := annotations[annotationKey]

    var patch []map[string]interface{}
    if exists {
        // 如果annotation存在,则使用"replace"操作
        patch = []map[string]interface{}{
            {
                "op":    "replace",
                "path":  "/metadata/annotations/" + annotationKey,
                "value": annotationValue,
            },
        }
    } else {
        // 如果annotation不存在,则使用"add"操作
        patch = []map[string]interface{}{
            {
                "op":    "add",
                "path":  "/metadata/annotations",
                "value": map[string]string{annotationKey: annotationValue},
            },
        }
    }

    patchBytes, err := json.Marshal(patch)
    if err != nil {
        return nil, fmt.Errorf("failed to marshal patch: %v", err)
    }

    return patchBytes, nil
}

func mergePatches(patches ...[]byte) ([]byte, error) {
    mergedPatch := make([]map[string]interface{}, 0)

    for _, patch := range patches {
        var patchObj []map[string]interface{}
        if err := json.Unmarshal(patch, &patchObj); err != nil {
            return nil, fmt.Errorf("failed to unmarshal patch: %v", err)
        }

        mergedPatch = append(mergedPatch, patchObj...)
    }

    mergedPatchBytes, err := json.Marshal(mergedPatch)
    if err != nil {
        return nil, fmt.Errorf("failed to marshal merged patch: %v", err)
    }

    return mergedPatchBytes, nil
}

部署 wehhook 服务到k8s集群

根据k8s webhook文档描述,k8s apiserver 通过 https post 访问 webhook server,就是说 webhook server 必须监听在 https 协议之上。
要使用 https 协议,有 3 个东西是必需的:ca,key,cert

  • apiserver 在访问 webhook server 的时候,要指定 ca 证书
  • webhook server 在启动的时候,要加载 key 和 cert。
    社区推荐使用 cert-manager 来为 webhook 提供证书管理服务。

cert-mangager的几个概念

  1. Issuers
    Issuers 是 cert-manager 中的一个自定义资源,相当于证书颁发机构,用于生成证书。
  2. Certificate
    Certificate 是 cert-manager中的一个自定义资源, 定义证书的信息。
    cert-manager 会根据certificate的定义结合issuer来创建真正的证书对(cert-pair)
  3. CA Injecter
    CA Injector 是cert-manager的一个controller. 它可以将caBundle 字段写入到这三个K8S Resource中:
    ValidatingWebhookConfiguration
    MutatingWebhookConfiguration
    CustomResourceDefinition
    要使用CA Injector, 需要在webhook的yaml中添加 annotation: cert-manager.io/inject-ca-from 告诉CA Injector, 要将哪一个证书写到cabundle中.

创建 Issuer

通常来讲, 为webhook server颁发证书使用自签名证书即可,因为除了k8s apiserver,没有其它外部服务会去访问 webhook server。
issuer.yaml

apiVersion: cert-manager.io/v1
kind: Issuer
metadata:
  name: selfsigned-issuer
  namespace: default
spec:
  selfSigned: {}

创建到k8s集群中

k apply -f issuer.yaml

➜  ~ k get issuers -owide
NAME                READY   STATUS   AGE
selfsigned-issuer   True             21h

创建 Certificate

创建 certificate 需要指定要创建的 dnsname 和 issuerref,还需要指定 secretName,表示证书存放在哪个 secret 中,用于部署 webhook server的时候挂在证书到容器中。
cert.yaml

apiVersion: cert-manager.io/v1
kind: Certificate
metadata:
  name: mutating-demo
  namespace: default
spec:
  dnsNames:
    - mutating-demo.default.svc
    - mutating-demo.default.svc.cluster.local
    - mutating-demo.example.com
  issuerRef:
    kind: Issuer
    name: selfsigned-issuer
  secretName: mutating-demo

创建到k8s集群中

k apply -f cert.yaml
➜  ~ k get certificate -owide
NAME            READY   SECRET          ISSUER              STATUS                                          AGE
mutating-demo   True    mutating-demo   selfsigned-issuer   Certificate is up to date and has not expired   21h

创建 mutatingwehbookconfiguration

mutatingwehbookconfiguration.yaml

apiVersion: admissionregistration.k8s.io/v1
kind: MutatingWebhookConfiguration
metadata:
  name: mutating-demo
  annotations:
    cert-manager.io/inject-ca-from: default/mutating-demo
webhooks:
  - admissionReviewVersions:
    - v1beta1
    - v1
    clientConfig:
      service:
        name: mutating-demo
        namespace: default
        path: /mutating-demo
        port: 8003
    name: mutating-demo.example.com
    failurePolicy: Fail
    matchPolicy: Equivalent
    rules:
      - apiGroups:
          - apps
        apiVersions:
          - v1
        operations:
          - CREATE
          - UPDATE
        resources:
          - deployments
    sideEffects: None
    timeoutSeconds: 10

创建到k8s集群中

k apply -f mutatingwehbookconfiguration.yaml


(base) ➜  ~ k get mutatingwebhookconfiguration mutating-demo -oyaml
apiVersion: admissionregistration.k8s.io/v1
kind: MutatingWebhookConfiguration
metadata:
...
webhooks:
- admissionReviewVersions:
  - v1beta1
  - v1
  clientConfig:
    caBundle: LS0tLS1CRUdJTiBDRVJUSUZJQ0FU.....
    service:
      name: mutating-demo
      namespace: default
      path: /mutating-demo
...

可以看到caBundle已经被自动填充进去了。

创建wenhook server deployment

webhookserver.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: mutating-demo-deployment
spec:
  replicas: 1
  selector:
    matchLabels:
      app: mutating-demo
  template:
    metadata:
      labels:
        app: mutating-demo
    spec:
      containers:
        - name: mutating-demo-container
          image: jiangzhiheng/mutating-demo:v0.1-202402182014
          ports:
            - containerPort: 8080
          volumeMounts:
            - name: secret-volume
              mountPath: /path/to/secret
              readOnly: true
      volumes:
        - name: secret-volume
          secret:
            secretName: mutating-demo
---
apiVersion: v1
kind: Service
metadata:
  name: mutating-demo
  namespace: default
spec:
  selector:
    app: mutating-demo
  ports:
    - protocol: TCP
      port: 8003
      targetPort: 8003
  type: ClusterIP

创建到 k8s 集群中

(base) ➜  ~ kubectl get svc,deployment,pod |grep mutating
service/mutating-demo        ClusterIP   10.110.159.147   <none>        8003/TCP                     21h
deployment.apps/mutating-demo-deployment          1/1     1            1           21h
pod/mutating-demo-deployment-f875b7c7c-cwh55           1/1     Running   0          21h

验证

kubectl create deployment nginx --image=nginx --replicas=5

➜  ~ k get deploy  nginx -oyaml
apiVersion: apps/v1
kind: Deployment
metadata:
  annotations:
    deployment.kubernetes.io/revision: "1"
    env-type: test
  creationTimestamp: "2024-02-18T12:23:48Z"
  generation: 1
  labels:
    app: nginx
  name: nginx
  namespace: default
  resourceVersion: "69379139"
  uid: 656709c3-d7d1-49d4-a7d4-f32df87f5dea
spec:
  progressDeadlineSeconds: 600
  replicas: 3
  revisionHistoryLimit: 10
...

可以看到副本已经被修改为 3,并且env-type: test的 annotations 也已经正常被添加。

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。
禁止转载,如需转载请通过简信或评论联系作者。

推荐阅读更多精彩内容