动态配置admission webhook举例(详情见官方文档:https://kubernetes.io/zh/docs/reference/access-authn-authz/extensible-admission-controllers/):
apiVersion: admissionregistration.k8s.io/v1
kind: ValidatingWebhookConfiguration
metadata:
name: "pod-policy.example.com"
webhooks:
- name: "pod-policy.example.com"
rules:
- apiGroups: [""]
apiVersions: ["v1"]
operations: ["CREATE"]
resources: ["pods"]
scope: "Namespaced"
clientConfig:
service:
namespace: "example-namespace"
name: "example-service"
caBundle: "Ci0tLS0tQk...<base64-encoded PEM bundle containing the CA that signed the webhook's serving certificate>...tLS0K"
admissionReviewVersions: ["v1", "v1beta1"]
sideEffects: None
timeoutSeconds: 5
一、初始化
kube-apiserver在调用NewServerRunOptions函数初始化options的时候,调用了NewAdmissionOptions去初始化了AdmissionOptions,并注册了内置的admission插件和webhook admission插件。
// NewServerRunOptions creates a new ServerRunOptions object with default parameters
func NewServerRunOptions() *ServerRunOptions {
s := ServerRunOptions{
// 省略...
// 初始化AdmissionOptions
Admission: kubeoptions.NewAdmissionOptions(),
Authentication: kubeoptions.NewBuiltInAuthenticationOptions().WithAll(),
Authorization: kubeoptions.NewBuiltInAuthorizationOptions(),
// 省略...
}
// ...
return &s
}
NewAdmissionOptions里面先是调用genericoptions.NewAdmissionOptions创建一个AdmissionOptions,NewAdmissionOptions同时也注册了lifecycle、validatingwebhook、mutatingwebhook这三个插件。然后再调用RegisterAllAdmissionPlugins注册内置的其他admission。
// pkg/kubeapiserver/options/admission.go
func NewAdmissionOptions() *AdmissionOptions {
// 1. 创建AdmissionOptions,并在里面注册了webhook的validating、mutating插件。
options := genericoptions.NewAdmissionOptions()
// register all admission plugins 2. 注册所有的内置的admission plugins
RegisterAllAdmissionPlugins(options.Plugins)
// set RecommendedPluginOrder 3.设置 admission plugin顺序
options.RecommendedPluginOrder = AllOrderedPlugins
// set DefaultOffPlugins 4.默认关闭的plugin
options.DefaultOffPlugins = DefaultOffAdmissionPlugins()
return &AdmissionOptions{
GenericAdmission: options,
}
}
webhook的validating、mutating插件注册时在genericoptions.NewAdmissionOptions中,server.RegisterAllAdmissionPlugins注册了lifecycle、validatingwebhook、mutatingwebhook这三个插件。
// staging/src/k8s.io/apiserver/pkg/server/options/admission.go
func NewAdmissionOptions() *AdmissionOptions {
options := &AdmissionOptions{
Plugins: admission.NewPlugins(),
Decorators: admission.Decorators{admission.DecoratorFunc(admissionmetrics.WithControllerMetrics)},
// This list is mix of mutating admission plugins and validating
// admission plugins. The apiserver always runs the validating ones
// after all the mutating ones, so their relative order in this list
// doesn't matter.
RecommendedPluginOrder: []string{lifecycle.PluginName, mutatingwebhook.PluginName, validatingwebhook.PluginName},
DefaultOffPlugins: sets.NewString(),
}
// 注册了lifecycle、validatingwebhook、mutatingwebhook
server.RegisterAllAdmissionPlugins(options.Plugins)
return options
}
// staging/src/k8s.io/apiserver/pkg/server/plugins.go
// RegisterAllAdmissionPlugins registers all admission plugins
func RegisterAllAdmissionPlugins(plugins *admission.Plugins) {
lifecycle.Register(plugins) // namespace lifecycle
validatingwebhook.Register(plugins) // validatingwebhook插件
mutatingwebhook.Register(plugins) // mutatingwebhook插件
}
二、Admission Plugins在kube-apiserver请求处理链中的位置
kube-apiserver在cmd/kube-apiserver/app/server.go.buildGenericConfig()中根据ServerOptions生成GenericConfig。
前面已经分析AdmissionPlugin注册到ServerRunOptions的过程, buildGenericConfig中会调用ServerRunOptions.Admission.ApplyTo生成admission chain设置到GenericConfig里面。把所有的admission plugin生成chainAdmissionHandler对象,其实就是plugin数组,这个类的Admit、Validate等方法会遍历调用每个plugin的Admit、Validate方法
GenericConfig.AdmissionControl 又会赋值给GenericAPIServer.admissionControl
func (a *AdmissionOptions) ApplyTo(
c *server.Config,
informers informers.SharedInformerFactory,
kubeAPIServerClientConfig *rest.Config,
features featuregate.FeatureGate,
pluginInitializers ...admission.PluginInitializer,
) error {
// 省略 ...
// 找到所有启用的plugin
pluginNames := a.enabledPluginNames()
pluginsConfigProvider, err := admission.ReadAdmissionConfiguration(pluginNames, a.ConfigFile, configScheme)
if err != nil {
return fmt.Errorf("failed to read plugin config: %v", err)
}
clientset, err := kubernetes.NewForConfig(kubeAPIServerClientConfig)
if err != nil {
return err
}
genericInitializer := initializer.New(clientset, informers, c.Authorization.Authorizer, features)
initializersChain := admission.PluginInitializers{}
pluginInitializers = append(pluginInitializers, genericInitializer)
initializersChain = append(initializersChain, pluginInitializers...)
// 把所有的admission plugin生成admissionChain,实际是个plugin数组
admissionChain, err := a.Plugins.NewFromPlugins(pluginNames, pluginsConfigProvider, initializersChain, a.Decorators)
if err != nil {
return err
}
// 把admissionChain设置给GenericConfig.AdmissionControl
c.AdmissionControl = admissionmetrics.WithStepMetrics(admissionChain)
return nil
}
Admission Plugin是在kube-apiserver处理完前面的handler之后,在调用RESTStorage的Get、Create、Update、Delete等函数前会调用Admission Plugin。
kube-apiserver有很多的handler组成了handler链,这写handler链的最内层,是使用gorestful框架注册的WebService。每个WebService都对应一种资源的RESTStorage,比如NodeStorage(pkg/registry/core/node/storage/storage.go ),installAPIResources初始化WebService时,会把RESTStorage的Get、Create、Update等函数分别封装成Get、POST、PUT等http方法的handler注册到WebService中。
比如把Update函数封装成http handler 作为PUT方法的handler,而在这个hanlder调用Update函数之前,会先调用Admission Plugin的Admit、Validate等函数。下面看个PUT方法的例子。
a.group.Admit是从GenericAPIServer.admissionControl取的值,就是前面ApplyTo函数生成的admissionChain。admit、updater作为参数调用restfulUpdateResource函数生成的handler
// staging/src/k8s.io/apiserver/pkg/endpoints/installer.go
func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storage, ws *restful.WebService) (*metav1.APIResource, error) {
admit := a.group.Admit
// 省略 ...
updater, isUpdater := storage.(rest.Updater)
// 省略 ...
switch action.Verb {
case "GET": ...
case "PUT": // Update a resource.
doc := "replace the specified " + kind
if isSubresource {
doc = "replace " + subresource + " of the specified " + kind
}
// admit、updater作为参数调用restfulUpdateResource函数生成的handler
handler := metrics.InstrumentRouteFunc(action.Verb, group, version, resource, subresource, requestScope, metrics.APIServerComponent, restfulUpdateResource(updater, reqScope, admit))
route := ws.PUT(action.Path).To(handler).
Doc(doc).
Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
Operation("replace"+namespaced+kind+strings.Title(subresource)+operationSuffix).
Produces(append(storageMeta.ProducesMIMETypes(action.Verb), mediaTypes...)...).
Returns(http.StatusOK, "OK", producedObject).
// TODO: in some cases, the API may return a v1.Status instead of the versioned object
// but currently go-restful can't handle multiple different objects being returned.
Returns(http.StatusCreated, "Created", producedObject).
Reads(defaultVersionedObject).
Writes(producedObject)
if err := AddObjectParams(ws, route, versionedUpdateOptions); err != nil {
return nil, err
}
addParams(route, action.Params)
routes = append(routes, route)
case "PARTCH": ...
// 省略 ....
}
}
看restfulUpdateResource的实现没啥详细内容,就是调用了 handlers.UpdateResource。
func restfulUpdateResource(r rest.Updater, scope handlers.RequestScope, admit admission.Interface) restful.RouteFunction {
return func(req *restful.Request, res *restful.Response) {
handlers.UpdateResource(r, &scope, admit)(res.ResponseWriter, req.Request)
}
}
看handlers.UpdateResource的代码实现,会先判断如果传入的admission.Interface参数是MutationInterface类型,就调用Admit,也就是调用admissionChain的Admit,最终会遍历调用每个Admission Plugin的Admit方法。而Webhook Admission是众多admission中的一个。
执行完Admission,后面的requestFunc 才会调用RESTStorage的Update函数。每个资源的RESTStorage最终都是要调用ETCD3Storage的Get、Update等函数。
// staging/src/k8s.io/apiserver/pkg/endpoints/handlers/update.go
func UpdateResource(r rest.Updater, scope *RequestScope, admit admission.Interface) http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
// 省略 ...
ae := request.AuditEventFrom(ctx)
audit.LogRequestObject(ae, obj, scope.Resource, scope.Subresource, scope.Serializer)
admit = admission.WithAudit(admit, ae)
// 如果admit是MutationInterface类型的,就调用其Admit函数,也就是admissionChain的Admit
if mutatingAdmission, ok := admit.(admission.MutationInterface); ok {
transformers = append(transformers, func(ctx context.Context, newObj, oldObj runtime.Object) (runtime.Object, error) {
isNotZeroObject, err := hasUID(oldObj)
if err != nil {
return nil, fmt.Errorf("unexpected error when extracting UID from oldObj: %v", err.Error())
} else if !isNotZeroObject {
if mutatingAdmission.Handles(admission.Create) {
return newObj, mutatingAdmission.Admit(ctx, admission.NewAttributesRecord(newObj, nil, scope.Kind, namespace, name, scope.Resource, scope.Subresource, admission.Create, updateToCreateOptions(options), dryrun.IsDryRun(options.DryRun), userInfo), scope)
}
} else {
if mutatingAdmission.Handles(admission.Update) {
return newObj, mutatingAdmission.Admit(ctx, admission.NewAttributesRecord(newObj, oldObj, scope.Kind, namespace, name, scope.Resource, scope.Subresource, admission.Update, options, dryrun.IsDryRun(options.DryRun), userInfo), scope)
}
}
return newObj, nil
})
}
// 省略 ...
// 执行完admission,这里才调用RESTStorage的Update函数
requestFunc := func() (runtime.Object, error) {
obj, created, err := r.Update(
ctx,
name,
rest.DefaultUpdatedObjectInfo(obj, transformers...),
// createValidation会调用ValidationInterface的Validate方法
withAuthorization(rest.AdmissionToValidateObjectFunc(
admit,
admission.NewAttributesRecord(nil, nil, scope.Kind, namespace, name, scope.Resource, scope.Subresource, admission.Create, updateToCreateOptions(options), dryrun.IsDryRun(options.DryRun), userInfo), scope),
scope.Authorizer, createAuthorizerAttributes),
// updateValidation会调用ValidationInterface的Validate方法
rest.AdmissionToValidateObjectUpdateFunc(
admit,
admission.NewAttributesRecord(nil, nil, scope.Kind, namespace, name, scope.Resource, scope.Subresource, admission.Update, options, dryrun.IsDryRun(options.DryRun), userInfo), scope),
false,
options,
)
wasCreated = created
return obj, err
}
result, err := finishRequest(timeout, func() (runtime.Object, error) {
result, err := requestFunc()
// 省略 ...
return result, err
})
// ...
transformResponseObject(ctx, scope, trace, req, w, status, outputMediaType, result)
}
}
以上是PUT方法的例子,里面调用了MutationInterface和ValidationInterface。其他的方法比如POST、DELETE等也是类似。但是GET方法不会调用Admission Plugin。
Webhook Admission 调用
validatingwebhook和mutatingwebhook分别位于staging/src/k8s.io/apiserver/pkg/admission/plugin/webhook/validating/plugin.go,staging/src/k8s.io/apiserver/pkg/admission/plugin/webhook/mutating/plugin.go两个文件中。
3.1 ValidatingAdmissionWebhook调用
- ValidatingAdmissionWebhook的Validate()函数实现了ValidationInterface接口,有请求到来时kube-apiserver会调用所有admission 的Validate()方法。ValidatingAdmissionWebhook持有了一个Webhook对象,Validate()会调用Webhook.Dispatch()。
2.Webhook.Dispatch()又调用了其持有的dispatcher的Dispatch()方法。dispatcher时通过dispatcherFactory创建的,dispatcherFactory是ValidatingAdmissionWebhook创建generic.Webhook时候传入的newValidatingDispatcher函数。调用dispatcherFactory函数创建的实际上是validatingDispatcher对象,也就是Webhook.Dispatch()调用的是validatingDispatcher.Dispatch()。
3.validatingDispatcher.Dispatch()会逐个远程调用注册的webhook plugin
NewValidatingAdmissionWebhook初始化了ValidatingAdmissionWebhook对象,内部持有了一个generic.Webhook对象,generic.Webhook是一个Validate和mutate公用的框架,创建generic.Webhook时需要一个dispatcherFactory函数,用这个函数生成dispatcher对象
// staging/src/k8s.io/apiserver/pkg/admission/plugin/webhook/validating/plugin.go
// NewValidatingAdmissionWebhook returns a generic admission webhook plugin.
func NewValidatingAdmissionWebhook(configFile io.Reader) (*Plugin, error) {
handler := admission.NewHandler(admission.Connect, admission.Create, admission.Delete, admission.Update)
p := &Plugin{}
var err error
p.Webhook, err = generic.NewWebhook(handler, configFile, configuration.NewValidatingWebhookConfigurationManager, newValidatingDispatcher(p))
if err != nil {
return nil, err
}
return p, nil
}
// Validate makes an admission decision based on the request attributes.
func (a *Plugin) Validate(ctx context.Context, attr admission.Attributes, o admission.ObjectInterfaces) error {
return a.Webhook.Dispatch(ctx, attr, o)
}
调用generic.Webhook.Dispatch()时会调用dispatcher对象的Dispatch。
// Dispatch is called by the downstream Validate or Admit methods.
func (a *Webhook) Dispatch(ctx context.Context, attr admission.Attributes, o admission.ObjectInterfaces) error {
if rules.IsWebhookConfigurationResource(attr) {
return nil
}
if !a.WaitForReady() {
return admission.NewForbidden(attr, fmt.Errorf("not yet ready to handle request"))
}
hooks := a.hookSource.Webhooks()
return a.dispatcher.Dispatch(ctx, attr, o, hooks)
}
validatingDispatcher.Dispatch遍历所有的hooks ,找到相关的webhooks,然后执行callHooks调用外部注册进来的
func (d *validatingDispatcher) Dispatch(ctx context.Context, attr admission.Attributes, o admission.ObjectInterfaces, hooks []webhook.WebhookAccessor) error {
var relevantHooks []*generic.WebhookInvocation
// Construct all the versions we need to call our webhooks
versionedAttrs := map[schema.GroupVersionKind]*generic.VersionedAttributes{}
for _, hook := range hooks {
// 遍历所有的webhooks,根据ValidatingWebhookConfiguration中的rules是否匹配找到所有相关的hooks
invocation, statusError := d.plugin.ShouldCallHook(hook, attr, o)
if statusError != nil {
return statusError
}
if invocation == nil {
continue
}
relevantHooks = append(relevantHooks, invocation)
// If we already have this version, continue
if _, ok := versionedAttrs[invocation.Kind]; ok {
continue
}
versionedAttr, err := generic.NewVersionedAttributes(attr, invocation.Kind, o)
if err != nil {
return apierrors.NewInternalError(err)
}
versionedAttrs[invocation.Kind] = versionedAttr
}
if len(relevantHooks) == 0 {
// no matching hooks
return nil
}
// Check if the request has already timed out before spawning remote calls
select {
case <-ctx.Done():
// parent context is canceled or timed out, no point in continuing
return apierrors.NewTimeoutError("request did not complete within requested timeout", 0)
default:
}
wg := sync.WaitGroup{}
errCh := make(chan error, len(relevantHooks))
wg.Add(len(relevantHooks))
for i := range relevantHooks {
go func(invocation *generic.WebhookInvocation) {
defer wg.Done()
hook, ok := invocation.Webhook.GetValidatingWebhook()
if !ok {
utilruntime.HandleError(fmt.Errorf("validating webhook dispatch requires v1.ValidatingWebhook, but got %T", hook))
return
}
versionedAttr := versionedAttrs[invocation.Kind]
t := time.Now()
// 启动多个go routine 并行调用注册进来的webhook plugin
err := d.callHook(ctx, hook, invocation, versionedAttr)
ignoreClientCallFailures := hook.FailurePolicy != nil && *hook.FailurePolicy == v1.Ignore
rejected := false
if err != nil {
switch err := err.(type) {
case *webhookutil.ErrCallingWebhook:
if !ignoreClientCallFailures {
rejected = true
admissionmetrics.Metrics.ObserveWebhookRejection(hook.Name, "validating", string(versionedAttr.Attributes.GetOperation()), admissionmetrics.WebhookRejectionCallingWebhookError, 0)
}
case *webhookutil.ErrWebhookRejection:
rejected = true
admissionmetrics.Metrics.ObserveWebhookRejection(hook.Name, "validating", string(versionedAttr.Attributes.GetOperation()), admissionmetrics.WebhookRejectionNoError, int(err.Status.ErrStatus.Code))
default:
rejected = true
admissionmetrics.Metrics.ObserveWebhookRejection(hook.Name, "validating", string(versionedAttr.Attributes.GetOperation()), admissionmetrics.WebhookRejectionAPIServerInternalError, 0)
}
}
admissionmetrics.Metrics.ObserveWebhook(time.Since(t), rejected, versionedAttr.Attributes, "validating", hook.Name)
if err == nil {
return
}
if callErr, ok := err.(*webhookutil.ErrCallingWebhook); ok {
if ignoreClientCallFailures {
klog.Warningf("Failed calling webhook, failing open %v: %v", hook.Name, callErr)
utilruntime.HandleError(callErr)
return
}
klog.Warningf("Failed calling webhook, failing closed %v: %v", hook.Name, err)
errCh <- apierrors.NewInternalError(err)
return
}
if rejectionErr, ok := err.(*webhookutil.ErrWebhookRejection); ok {
err = rejectionErr.Status
}
klog.Warningf("rejected by webhook %q: %#v", hook.Name, err)
errCh <- err
}(relevantHooks[i])
}
// 等待多个goroutine 执行完成
wg.Wait()
close(errCh)
var errs []error
for e := range errCh {
errs = append(errs, e)
}
if len(errs) == 0 {
return nil
}
if len(errs) > 1 {
for i := 1; i < len(errs); i++ {
// TODO: merge status errors; until then, just return the first one.
utilruntime.HandleError(errs[i])
}
}
return errs[0]
}
3.2 MutatingAdmissionWebhook调用
看MutatingWebhook的构造函数就可以看到,MutatingWebhook和ValidatingWebhook的代码架构是一样的,只不过在创建generic.Webhook的时候传入的dispatcherFactory函数是newMutatingDispatcher,所以Webhook.Dispatch()最终调用的就是mutatingDispatcher.Dispatch(),这个和validatingDispatcher.Dispatch的实现逻辑基本是一样的,也是根据WebhookConfiguration中的rules是否匹配找到相关的webhooks,然后逐个调用。
// staging/src/k8s.io/apiserver/pkg/admission/plugin/webhook/mutating/plugin.go
// NewMutatingWebhook returns a generic admission webhook plugin.
func NewMutatingWebhook(configFile io.Reader) (*Plugin, error) {
handler := admission.NewHandler(admission.Connect, admission.Create, admission.Delete, admission.Update)
p := &Plugin{}
var err error
p.Webhook, err = generic.NewWebhook(handler, configFile, configuration.NewMutatingWebhookConfigurationManager, newMutatingDispatcher(p))
if err != nil {
return nil, err
}
return p, nil
}
// ValidateInitialization implements the InitializationValidator interface.
func (a *Plugin) ValidateInitialization() error {
if err := a.Webhook.ValidateInitialization(); err != nil {
return err
}
return nil
}
// Admit makes an admission decision based on the request attributes.
func (a *Plugin) Admit(ctx context.Context, attr admission.Attributes, o admission.ObjectInterfaces) error {
return a.Webhook.Dispatch(ctx, attr, o)
}