引言
在实际的 kubernetes 使用场景下,我们可能需要对提交到集群的任务进行自定义的准入控制,例如对特定的任务添加指定的label 或者 annotations,再比如需要限制提交任务的资源配比,亦或是为了实现安全管控从而拒绝特权容器等等。
我们不妨考虑 kubernetes 的准入控制特性来实现我们的需求。
Admission Webhook
Admission webhook 是k8s apiserver对外提供的一种扩展能力,在 kubernetes apiserver 中包含两个特殊的准入控制器: MutatingAdmissionWebhook
和 ValidatingAdmissionWebhook
。这两个控制器将发送准入请求到外部的 HTTP 回调服务并接收一个准入响应。通过自己实现这个外部的 http 回调服务,我们可以实现对k8s原生或者自定义控制器的行为控制,比如你想阻止某个控制器的行为,或拦截某个控制器的资源修改等。
实现步骤
本文以 mutatingwebhook为例
定义一个mutatingwebhookconfiguration 资源
apiVersion: admissionregistration.k8s.io/v1
kind: MutatingWebhookConfiguration
metadata:
name: mutating-demo
annotations:
cert-manager.io/inject-ca-from: default/mutating-demo
webhooks:
- admissionReviewVersions: # admissionReviewVersions 请求的版本
- v1beta1
- v1
clientConfig: # 客户端配置
caBundle: # ca证书, 用于 apiserver 访问 webhook 服务时携带,本文我们通过 cert-manager 的cainjector来实现自动注入,可以不需要此字段
service: # 调用服务相关配置,这里是一个k8s的service,访问地址是<name>.<namespace>.svc:<port>/<path>
name: mutating-demo
namespace: default
path: /mutating-demo
port: 8003
name: mutating-demo.example.com # webhook名称
failurePolicy: Fail # 调用失败策略,Ignore为忽略错误, failed表示admission会处理错误
matchPolicy: Equivalent
rules: # 规则
- apiGroups:
- apps
apiVersions:
- v1
operations:
- CREATE
- UPDATE
resources:
- deployments
scope: '*' # 匹配范围,"*" 匹配所有资源,但不包括子资源,"*/*" 匹配所有资源,包括子资源
sideEffects: None # 这个表示webhook是否存在副作用,主要针对 dryRun 的请求
timeoutSeconds: 10
# reinvocationPolicy表示再调度策略,因为webhook本身没有顺序性,因此每个修改后可能又被其他webhook修改,所以提供
# 一个策略表示是否需要被多次调用,Never 表示只会调度一次,IfNeeded 表示资源被修改后会再调度这个webhook
reinvocationPolicy: Never
Webhook 执行流程
Admission Webhook 本质是一个 apiserver 的 webhook 调用,如下是 apiserver 的处理流程:
apiserver 通过读取 mutatingwebhookconfiguration
和 validatingwebhookconfiguration
资源文件的目标地址,然后回调我们自定义的服务。
┌──────────────────────────────────┐
┌─────────────────┐ │ │
apply │ │ read │ validatingwebhookconfiguration │
────────────►│ api-server │◄───────────┤ │
│ │ │ mutatingwebhookconfiguration │
└────────┬────────┘ │ │
│ └──────────────────────────────────┘
│
│ 回调
│
│
┌────────▼────────┐
│ │
│ webhookservice │
│ │
└─────────────────┘
api-server 发起的请求是一串json数据格式,header需要设置content-type为application/json, 我们看看请求的 body :
curl -X POST \
http://webhook-url \
-H 'content-type: application/json' \
-d '{
"apiVersion": "admission.k8s.io/v1",
"kind": "AdmissionReview",
"request": {
...
}
}'
返回给 apiserver 的结果:
{
"kind": "AdmissionReview",
"apiVersion": "admission.k8s.io/v1",
"response": {
"uid": "b955fb34-0135-4e78-908e-eeb2f874933f",
"allowed": true,
"status": {
"metadata": {},
"code": 200
},
"patch": "W3sib3AiOiJyZXBsYWNlIiwicGF0aCI6Ii9zcGVjL3JlcGxpY2FzIiwidmFsdWUiOjJ9XQ==",
"patchType": "JSONPatch"
}
}
这里的 patch 是用 base64 编码的一个 json,我们解码看下结果:
$ echo "W3sib3AiOiJyZXBsYWNlIiwicGF0aCI6Ii9zcGVjL3JlcGxpY2FzIiwidmFsdWUiOjJ9XQ==" | base64 -d
[
{
"op": "replace",
"path": "/spec/replicas",
"value": 2
}
]
编写 webhook 服务
本次我们约定实现以下逻辑:
针对 deployment 资源
- replicas 副本数最大为 3,如果超过 3 副本,则强制修改为 3。
- 所有新增或更新的 deployment 资源,添加 env-type=test的 annotation。
package main
import (
"encoding/json"
"flag"
"fmt"
"io"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
"net/http"
"github.com/gin-gonic/gin"
log "github.com/sirupsen/logrus"
admissionv1 "k8s.io/api/admission/v1"
appsv1 "k8s.io/api/apps/v1"
)
var certFile string
var keyFile string
var (
scheme = runtime.NewScheme()
codecs = serializer.NewCodecFactory(scheme)
)
func main() {
flag.StringVar(&certFile, "certFile", "", "the webhook server certFile")
flag.StringVar(&keyFile, "keyFile", "", "the webhook server keyFile")
flag.Parse()
router := gin.Default()
router.POST("/mutating-demo", mutatingDeployment)
// 启动HTTPS服务器
err := router.RunTLS(":8003", certFile, keyFile)
if err != nil {
log.Fatal("Failed to start HTTPS server: ", err)
}
}
func mutatingDeployment(c *gin.Context) {
body, err := io.ReadAll(c.Request.Body)
if err != nil {
log.Errorf("Failed to read request body: %v", err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to read request body"})
return
}
//解析 AdmissionReview 对象,也可以用Golang原生的JSON序列化器(encoding/json包),有兴趣网上可以查一下两种编解码器的区别
admissionReview := &admissionv1.AdmissionReview{}
if _,_,err := codecs.UniversalDeserializer().Decode(body,nil, admissionReview);err != nil{
log.Errorf("Failed to parse AdmissionReview: %v", err)
c.JSON(http.StatusBadRequest, gin.H{"error": "Failed to parse AdmissionReview"})
return
}
// 获取 deployment 对象
deployment := &appsv1.Deployment{}
rawObject := admissionReview.Request.Object.Raw
if _,_,err := codecs.UniversalDeserializer().Decode(rawObject,nil ,deployment); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "Failed to parse Deployment"})
return
}
// 任务 1:给所有 deployment 资源加上 env-type=test 的 annotation
annotationPatch,err := setAnnotationPatch(deployment)
if err != nil{
log.Errorf("Failed to set Deployment annotations: %v", err)
c.JSON(http.StatusBadRequest, gin.H{"error": "Failed to set Deployment annotations"})
return
}
// 任务 2:验证 deployment 的副本数,如果大于 3 则修改为 3, 也就是副本数最大不能超过 3。
if *deployment.Spec.Replicas > 3 {
// 修改副本数为 1
deployment.Spec.Replicas = new(int32)
*deployment.Spec.Replicas = 3
deploymentPatch := setReplicasPatch(deployment)
// 构建允许相应
finalPatch, err := mergePatches(deploymentPatch,annotationPatch)
if err != nil{
log.Errorf("merge patch failed: %v", err)
c.JSON(http.StatusBadRequest, gin.H{"error": "merge patch failed"})
return
}
admissionReview.Response = &admissionv1.AdmissionResponse{
Allowed: true,
Patch: finalPatch,
PatchType: func() *admissionv1.PatchType {
pt := admissionv1.PatchTypeJSONPatch
return &pt
}(),
}
} else {
// 构建允许响应
admissionReview.Response = &admissionv1.AdmissionResponse{
Allowed: true,
Patch: annotationPatch,
PatchType: func() *admissionv1.PatchType {
pt := admissionv1.PatchTypeJSONPatch
return &pt
}(),
}
}
// 设置相应 uid 和 api 版本
admissionReview.Response.UID = admissionReview.Request.UID
// 构建 AdmissionReview 响应
responseBody, err := json.Marshal(admissionReview)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to marshal AdmissionReview response"})
return
}
// 发送 AdmissionReview 响应
c.Data(http.StatusOK, "application/json", responseBody)
}
func setReplicasPatch(deployment *appsv1.Deployment) []byte {
patch := []map[string]interface{}{
{
"op": "replace",
"path": "/spec/replicas",
"value": *deployment.Spec.Replicas,
},
}
patchBytes, err := json.Marshal(patch)
if err != nil {
log.Error("Failed to marshal patch:", err)
}
return patchBytes
}
func setAnnotationPatch(deployment *appsv1.Deployment) ([]byte, error) {
annotationKey := "env-type"
annotationValue := "test"
annotations := deployment.ObjectMeta.GetAnnotations()
_, exists := annotations[annotationKey]
var patch []map[string]interface{}
if exists {
// 如果annotation存在,则使用"replace"操作
patch = []map[string]interface{}{
{
"op": "replace",
"path": "/metadata/annotations/" + annotationKey,
"value": annotationValue,
},
}
} else {
// 如果annotation不存在,则使用"add"操作
patch = []map[string]interface{}{
{
"op": "add",
"path": "/metadata/annotations",
"value": map[string]string{annotationKey: annotationValue},
},
}
}
patchBytes, err := json.Marshal(patch)
if err != nil {
return nil, fmt.Errorf("failed to marshal patch: %v", err)
}
return patchBytes, nil
}
func mergePatches(patches ...[]byte) ([]byte, error) {
mergedPatch := make([]map[string]interface{}, 0)
for _, patch := range patches {
var patchObj []map[string]interface{}
if err := json.Unmarshal(patch, &patchObj); err != nil {
return nil, fmt.Errorf("failed to unmarshal patch: %v", err)
}
mergedPatch = append(mergedPatch, patchObj...)
}
mergedPatchBytes, err := json.Marshal(mergedPatch)
if err != nil {
return nil, fmt.Errorf("failed to marshal merged patch: %v", err)
}
return mergedPatchBytes, nil
}
部署 wehhook 服务到k8s集群
根据k8s webhook文档描述,k8s apiserver 通过 https post 访问 webhook server,就是说 webhook server 必须监听在 https 协议之上。
要使用 https 协议,有 3 个东西是必需的:ca,key,cert
- apiserver 在访问 webhook server 的时候,要指定 ca 证书
- webhook server 在启动的时候,要加载 key 和 cert。
社区推荐使用 cert-manager 来为 webhook 提供证书管理服务。
cert-mangager的几个概念
- Issuers
Issuers 是 cert-manager 中的一个自定义资源,相当于证书颁发机构,用于生成证书。 - Certificate
Certificate 是 cert-manager中的一个自定义资源, 定义证书的信息。
cert-manager 会根据certificate的定义结合issuer来创建真正的证书对(cert-pair) - CA Injecter
CA Injector 是cert-manager的一个controller. 它可以将caBundle 字段写入到这三个K8S Resource中:
ValidatingWebhookConfiguration
MutatingWebhookConfiguration
CustomResourceDefinition
要使用CA Injector, 需要在webhook的yaml中添加annotation: cert-manager.io/inject-ca-from
告诉CA Injector, 要将哪一个证书写到cabundle中.
创建 Issuer
通常来讲, 为webhook server颁发证书使用自签名证书即可,因为除了k8s apiserver,没有其它外部服务会去访问 webhook server。
issuer.yaml
apiVersion: cert-manager.io/v1
kind: Issuer
metadata:
name: selfsigned-issuer
namespace: default
spec:
selfSigned: {}
创建到k8s集群中
k apply -f issuer.yaml
➜ ~ k get issuers -owide
NAME READY STATUS AGE
selfsigned-issuer True 21h
创建 Certificate
创建 certificate 需要指定要创建的 dnsname 和 issuerref,还需要指定 secretName,表示证书存放在哪个 secret 中,用于部署 webhook server的时候挂在证书到容器中。
cert.yaml
apiVersion: cert-manager.io/v1
kind: Certificate
metadata:
name: mutating-demo
namespace: default
spec:
dnsNames:
- mutating-demo.default.svc
- mutating-demo.default.svc.cluster.local
- mutating-demo.example.com
issuerRef:
kind: Issuer
name: selfsigned-issuer
secretName: mutating-demo
创建到k8s集群中
k apply -f cert.yaml
➜ ~ k get certificate -owide
NAME READY SECRET ISSUER STATUS AGE
mutating-demo True mutating-demo selfsigned-issuer Certificate is up to date and has not expired 21h
创建 mutatingwehbookconfiguration
mutatingwehbookconfiguration.yaml
apiVersion: admissionregistration.k8s.io/v1
kind: MutatingWebhookConfiguration
metadata:
name: mutating-demo
annotations:
cert-manager.io/inject-ca-from: default/mutating-demo
webhooks:
- admissionReviewVersions:
- v1beta1
- v1
clientConfig:
service:
name: mutating-demo
namespace: default
path: /mutating-demo
port: 8003
name: mutating-demo.example.com
failurePolicy: Fail
matchPolicy: Equivalent
rules:
- apiGroups:
- apps
apiVersions:
- v1
operations:
- CREATE
- UPDATE
resources:
- deployments
sideEffects: None
timeoutSeconds: 10
创建到k8s集群中
k apply -f mutatingwehbookconfiguration.yaml
(base) ➜ ~ k get mutatingwebhookconfiguration mutating-demo -oyaml
apiVersion: admissionregistration.k8s.io/v1
kind: MutatingWebhookConfiguration
metadata:
...
webhooks:
- admissionReviewVersions:
- v1beta1
- v1
clientConfig:
caBundle: LS0tLS1CRUdJTiBDRVJUSUZJQ0FU.....
service:
name: mutating-demo
namespace: default
path: /mutating-demo
...
可以看到caBundle已经被自动填充进去了。
创建wenhook server deployment
webhookserver.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: mutating-demo-deployment
spec:
replicas: 1
selector:
matchLabels:
app: mutating-demo
template:
metadata:
labels:
app: mutating-demo
spec:
containers:
- name: mutating-demo-container
image: jiangzhiheng/mutating-demo:v0.1-202402182014
ports:
- containerPort: 8080
volumeMounts:
- name: secret-volume
mountPath: /path/to/secret
readOnly: true
volumes:
- name: secret-volume
secret:
secretName: mutating-demo
---
apiVersion: v1
kind: Service
metadata:
name: mutating-demo
namespace: default
spec:
selector:
app: mutating-demo
ports:
- protocol: TCP
port: 8003
targetPort: 8003
type: ClusterIP
创建到 k8s 集群中
(base) ➜ ~ kubectl get svc,deployment,pod |grep mutating
service/mutating-demo ClusterIP 10.110.159.147 <none> 8003/TCP 21h
deployment.apps/mutating-demo-deployment 1/1 1 1 21h
pod/mutating-demo-deployment-f875b7c7c-cwh55 1/1 Running 0 21h
验证
kubectl create deployment nginx --image=nginx --replicas=5
➜ ~ k get deploy nginx -oyaml
apiVersion: apps/v1
kind: Deployment
metadata:
annotations:
deployment.kubernetes.io/revision: "1"
env-type: test
creationTimestamp: "2024-02-18T12:23:48Z"
generation: 1
labels:
app: nginx
name: nginx
namespace: default
resourceVersion: "69379139"
uid: 656709c3-d7d1-49d4-a7d4-f32df87f5dea
spec:
progressDeadlineSeconds: 600
replicas: 3
revisionHistoryLimit: 10
...
可以看到副本已经被修改为 3,并且env-type: test
的 annotations 也已经正常被添加。