kubernetes源码分析之API Server

1.API Server在集群中的作用

在整个集群中,API Server处于核心的位置,是各个组件和etcd通信的桥梁,是外部请求的入口。同时承担着集群的安全访问控制,在新节点加入分发证书等。

2.API Server内部的关系图

inside.png

3.API Server源码中的关键性调用链

kube-apiserver.png

4.具体的源码分析过程

1.组件启动的入口:

位置:k8s.io/kubernetes/cmd/kube-apiserver/apiserver.go

func main() {
    rand.Seed(time.Now().UTC().UnixNano())
        //初始化配置,转2
    command := app.NewAPIServerCommand(server.SetupSignalHandler())

    // TODO: once we switch everything over to Cobra commands, we can go back to calling
    // utilflag.InitFlags() (by removing its pflag.Parse() call). For now, we have to set the
    // normalize func and add the go flag set by hand.
    pflag.CommandLine.SetNormalizeFunc(utilflag.WordSepNormalizeFunc)
    pflag.CommandLine.AddGoFlagSet(goflag.CommandLine)
    // utilflag.InitFlags()
    logs.InitLogs()
    defer logs.FlushLogs()
        //真正的执行,转3
    if err := command.Execute(); err != nil {
        fmt.Fprintf(os.Stderr, "error: %v\n", err)
        os.Exit(1)
    }
}
2.读取配置文件,进行配置读取和初始化默认配置

位置:k8s.io/kubernetes/cmd/kube-apiserver/server.go

//初始化RunOptions的结构
s := options.NewServerRunOptions()
//构造命令对象
cmd := &cobra.Command{
        Use: "kube-apiserver",
        Long: `The Kubernetes API server validates and configures data
for the api objects which include pods, services, replicationcontrollers, and
others. The API Server services REST operations and provides the frontend to the
cluster's shared state through which all other components interact.`,
        RunE: func(cmd *cobra.Command, args []string) error {
            verflag.PrintAndExitIfRequested()
            utilflag.PrintFlags(cmd.Flags())

            // set default options
            completedOptions, err := Complete(s)
            if err != nil {
                return err
            }

            // validate options
            if errs := completedOptions.Validate(); len(errs) != 0 {
                return utilerrors.NewAggregate(errs)
            }

            return Run(completedOptions, stopCh)
        },
    }
//对配置文件中的参数进行解析复制,同时设置相关的默认值。包括etcd、认证、鉴权相关等。
s.AddFlags(cmd.Flags())
3.实际上main中的Execute是第2步中构造的RunE,。

位置:k8s.io/kubernetes/cmd/kube-apiserver/server.go -> Run
①设置默认值和一些需要动态计算的配置
②对配置的合法性进行校验
③调用Run执行CreateServerChain

func Run(completeOptions completedServerRunOptions, stopCh <-chan struct{}) error {
    // To help debugging, immediately log version
    glog.Infof("Version: %+v", version.Get())

    server, err := CreateServerChain(completeOptions, stopCh)
    if err != nil {
        return err
    }

    return server.PrepareRun().Run(stopCh)
}
4.在CreateServerChain中执行APIServer所有的关键性动作。

位置:k8s.io/kubernetes/cmd/kube-apiserver/server.go

// CreateServerChain creates the apiservers connected via delegation.
func CreateServerChain(completedOptions completedServerRunOptions, stopCh <-chan struct{}) (*genericapiserver.GenericAPIServer, error) {
    nodeTunneler, proxyTransport, err := CreateNodeDialer(completedOptions)
    if err != nil {
        return nil, err
    }

    //note :RestOptionsGetter初始化
    kubeAPIServerConfig, sharedInformers, versionedInformers, insecureServingOptions, serviceResolver, pluginInitializer, admissionPostStartHook, err := CreateKubeAPIServerConfig(completedOptions, nodeTunneler, proxyTransport)
    if err != nil {
        return nil, err
    }

    // If additional API servers are added, they should be gated.
    //note :RestOptionsGetter初始化
    apiExtensionsConfig, err := createAPIExtensionsConfig(*kubeAPIServerConfig.GenericConfig, versionedInformers, pluginInitializer, completedOptions.ServerRunOptions, completedOptions.MasterCount)
    if err != nil {
        return nil, err
    }
    apiExtensionsServer, err := createAPIExtensionsServer(apiExtensionsConfig, genericapiserver.NewEmptyDelegate())
    if err != nil {
        return nil, err
    }

    kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, apiExtensionsServer.GenericAPIServer, sharedInformers, versionedInformers, admissionPostStartHook)
    if err != nil {
        return nil, err
    }

    // otherwise go down the normal path of standing the aggregator up in front of the API server
    // this wires up openapi
    kubeAPIServer.GenericAPIServer.PrepareRun()

    // This will wire up openapi for extension api server
    apiExtensionsServer.GenericAPIServer.PrepareRun()

    // aggregator comes last in the chain
    aggregatorConfig, err := createAggregatorConfig(*kubeAPIServerConfig.GenericConfig, completedOptions.ServerRunOptions, versionedInformers, serviceResolver, proxyTransport, pluginInitializer)
    if err != nil {
        return nil, err
    }
    aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, apiExtensionsServer.Informers)
    if err != nil {
        // we don't need special handling for innerStopCh because the aggregator server doesn't create any go routines
        return nil, err
    }

    if insecureServingOptions != nil {
        insecureHandlerChain := kubeserver.BuildInsecureHandlerChain(aggregatorServer.GenericAPIServer.UnprotectedHandler(), kubeAPIServerConfig.GenericConfig)
        if err := kubeserver.NonBlockingRun(insecureServingOptions, insecureHandlerChain, kubeAPIServerConfig.GenericConfig.RequestTimeout, stopCh); err != nil {
            return nil, err
        }
    }

    return aggregatorServer.GenericAPIServer, nil
}

主要动作包括如下:
根据配置文件创建拨号器, 主要用于通过ssh和node进行通讯,做健康检查等。
转到CreateNodeDialer k8s.io/kubernetes/cmd/kube-apiserver/server.go ->CreateNodeDialer


// CreateNodeDialer creates the dialer infrastructure to connect to the nodes.
func CreateNodeDialer(s completedServerRunOptions) (tunneler.Tunneler, *http.Transport, error) {
    ····
        healthCheckPath := &url.URL{
            Scheme: "http",
            Host:   net.JoinHostPort("127.0.0.1", strconv.FormatUint(uint64(s.KubeletConfig.ReadOnlyPort), 10)),
            Path:   "healthz",
        }
        nodeTunneler = tunneler.New(s.SSHUser, s.SSHKeyfile, healthCheckPath, installSSHKey)

        // Use the nodeTunneler's dialer when proxying to pods, services, and nodes
        proxyDialerFn = nodeTunneler.Dial
    }
    // Proxying to pods and services is IP-based... don't expect to be able to verify the hostname
    proxyTLSClientConfig := &tls.Config{InsecureSkipVerify: true}
    proxyTransport := utilnet.SetTransportDefaults(&http.Transport{
        DialContext:     proxyDialerFn,
        TLSClientConfig: proxyTLSClientConfig,
    })
    return nodeTunneler, proxyTransport, nil
}

创建kubeAPIServerConfig,sharedInformers等关键对象。
转到CreateKubeAPIServerConfig k8s.io/kubernetes/cmd/kube-apiserver/server.go -> CreateKubeAPIServerConfig

// CreateKubeAPIServerConfig creates all the resources for running the API server, but runs none of them
func CreateKubeAPIServerConfig(
    s completedServerRunOptions,
    nodeTunneler tunneler.Tunneler,
    proxyTransport *http.Transport,
) (
    config *master.Config,
    sharedInformers informers.SharedInformerFactory,
    versionedInformers clientgoinformers.SharedInformerFactory,
    insecureServingInfo *kubeserver.InsecureServingInfo,
    serviceResolver aggregatorapiserver.ServiceResolver,
    pluginInitializers []admission.PluginInitializer,
    admissionPostStartHook genericapiserver.PostStartHookFunc,
    lastErr error,
) {
    var genericConfig *genericapiserver.Config
    genericConfig, sharedInformers, versionedInformers, insecureServingInfo, serviceResolver, pluginInitializers, admissionPostStartHook, lastErr = BuildGenericConfig(s.ServerRunOptions, proxyTransport)
    ···
    // storageFactory类似JAVA中的SessionFactory
    storageFactory, lastErr := BuildStorageFactory(s.ServerRunOptions, genericConfig.MergedResourceConfig)
    ···
   // 初始哈Master对象
    config = &master.Config{
       ···
         }
    return
}

转到BuildGenericConfig k8s.io/kubernetes/cmd/kube-apiserver/server.go -> BuildGenericConfig

// BuildGenericConfig takes the master server options and produces the genericapiserver.Config associated with it
func BuildGenericConfig(
    s *options.ServerRunOptions,
    proxyTransport *http.Transport,
) (
    genericConfig *genericapiserver.Config,
    sharedInformers informers.SharedInformerFactory,
    versionedInformers clientgoinformers.SharedInformerFactory,
    insecureServingInfo *kubeserver.InsecureServingInfo,
    serviceResolver aggregatorapiserver.ServiceResolver,
    pluginInitializers []admission.PluginInitializer,
    admissionPostStartHook genericapiserver.PostStartHookFunc,
    lastErr error,
) {
    genericConfig = genericapiserver.NewConfig(legacyscheme.Codecs)
    ···
       //此处又创建一个storageFactory,在上层方法CreateKubeAPIServerConfig中也创建storageFactory,此处个人认为可以共用,其源码内部很多对象都采用深度复制
//storageFactory,为后期各个资源创建Storage
    storageFactory, lastErr := BuildStorageFactory(s, genericConfig.MergedResourceConfig)
    if lastErr != nil {
        return
    }
      //初始化RESTOptionsGetter,后期根据其获取操作Etcd的句柄
    if lastErr = s.Etcd.ApplyWithStorageFactoryTo(storageFactory, genericConfig); lastErr != nil {
        return
    }

    // Use protobufs for self-communication.
    // Since not every generic apiserver has to support protobufs, we
    // cannot default to it in generic apiserver and need to explicitly
    // set it in kube-apiserver.
    // 指定数据通讯协议protobuf
    genericConfig.LoopbackClientConfig.ContentConfig.ContentType = "application/vnd.kubernetes.protobuf"
    // 初始化各个资源的Client
          //创建连接APIserver的本机客户端,API Server内部的对象都是使用封装之后的informers
    client, err := internalclientset.NewForConfig(genericConfig.LoopbackClientConfig)
    if err != nil {
        lastErr = fmt.Errorf("failed to create clientset: %v", err)
        return
    }

    kubeClientConfig := genericConfig.LoopbackClientConfig
 //Informers是k8s对kubeApiserver的一个调用封装,主要实现watch和list功能
    sharedInformers = informers.NewSharedInformerFactory(client, 10*time.Minute)
    clientgoExternalClient, err := clientgoclientset.NewForConfig(kubeClientConfig)
    if err != nil {
        lastErr = fmt.Errorf("failed to create real external clientset: %v", err)
        return
    }
    versionedInformers = clientgoinformers.NewSharedInformerFactory(clientgoExternalClient, 10*time.Minute)

    ···
    return
}

创建createAPIExtensionsConfig对象。
实际上APIExtensionsConfig和APIConfig相似只是这部分用于自定义扩展使用,其内部只要还是创建SimpleRestOptionsFactory,就是Etcd相关的配置,但是和APIConfig是独立的配置,两个配置是深度拷贝。
转到createAPIExtensionsConfig k8s.io/kubernetes/cmd/kube-apiserver/app/apiextensions.go -> createAPIExtensionsConfig

func createAPIExtensionsConfig(
    kubeAPIServerConfig genericapiserver.Config,
    externalInformers kubeexternalinformers.SharedInformerFactory,
    pluginInitializers []admission.PluginInitializer,
    commandOptions *options.ServerRunOptions,
    masterCount int,
) (*apiextensionsapiserver.Config, error) {

    //对配置进行深度拷贝
    genericConfig := kubeAPIServerConfig
   
    ···
       //Etcd的操作对象,RestOptionsGetter初始化
    genericConfig.RESTOptionsGetter = &genericoptions.SimpleRestOptionsFactory{Options: etcdOptions}
    ···
    //apiextensionsConfig 
    apiextensionsConfig := &apiextensionsapiserver.Config{
        ···
       }
    return apiextensionsConfig, nil
}

转到New k8s.io/apiextensions-apiserver/pkg/apiserver/apiserver.go -> New
创建createAPIExtensionsServer服务。
位置:k8s.io/kubernetes/cmd/kube-apiserver/app/apiextensions.go -> createAPIExtensionsServer
转到New k8s.io/apiextensions-apiserver/pkg/apiserver/apiserver.go -> New
为自定义的一些资源创建Rest对象,初始化自定义资源的相关处理器

// New returns a new instance of CustomResourceDefinitions from the given config.
func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*CustomResourceDefinitions, error) {
    ···
    if err := s.GenericAPIServer.InstallAPIGroup(&apiGroupInfo); err != nil {
        return nil, err
    }
//创建连接本机的client
    crdClient, err := internalclientset.NewForConfig(s.GenericAPIServer.LoopbackClientConfig)
    
    s.Informers = internalinformers.NewSharedInformerFactory(crdClient, 5*time.Minute)
      //初始化相关handler
    ···
     //钩子处理
    ···
    return s, nil
}

转到InstallAPIGroup k8s.io/apiextensions-apiserver/pkg/apiserver/apiserver.go -> InstallAPIGroup
暴露api组

// Exposes the given api group in the API.
func (s *GenericAPIServer) InstallAPIGroup(apiGroupInfo *APIGroupInfo) error {
    ···
    if err := s.installAPIResources(APIGroupPrefix, apiGroupInfo); err != nil {
        return err
    }
    ···
    return nil
}

转到installAPIResources k8s.io/apiextensions-apiserver/pkg/apiserver/apiserver.go -> installAPIResources
遍历安装每个资源的API

// installAPIResources is a private method for installing the REST storage backing each api groupversionresource
func (s *GenericAPIServer) installAPIResources(apiPrefix string, apiGroupInfo *APIGroupInfo) error {
    ···
    for _, groupVersion := range apiGroupInfo.PrioritizedVersions {
      ···
      if err := apiGroupVersion.InstallREST(s.Handler.GoRestfulContainer); err != nil {
        return fmt.Errorf("unable to setup API %v: %v", apiGroupInfo, err)
      }
      ···
    }
    return nil
}

转到InstallREST k8s.io/apiserver/pkg/endpoints/groupversion.go -> InstallREST
将自定义资源的API处理器注册到restful容器中

// InstallREST registers the REST handlers (storage, watch, proxy and redirect) into a restful Container.
// It is expected that the provided path root prefix will serve all operations. Root MUST NOT end
// in a slash.
func (g *APIGroupVersion) InstallREST(container *restful.Container) error {
    prefix := path.Join(g.Root, g.GroupVersion.Group, g.GroupVersion.Version)
    installer := &APIInstaller{
        group:                        g,
        prefix:                       prefix,
        minRequestTimeout:            g.MinRequestTimeout,
        enableAPIResponseCompression: g.EnableAPIResponseCompression,
    }

    apiResources, ws, registrationErrors := installer.Install()
    versionDiscoveryHandler := discovery.NewAPIVersionHandler(g.Serializer, g.GroupVersion, staticLister{apiResources})
    versionDiscoveryHandler.AddToWebService(ws)
    container.Add(ws)
    return utilerrors.NewAggregate(registrationErrors)
}

转到Install k8s.io/apiserver/pkg/endpoints/installer.go -> Install
通过ws将相关的API暴露出去,并绑定将路径和对应的handler,ws的用法可参考 go-restful

// Install handlers for API resources.
func (a *APIInstaller) Install() ([]metav1.APIResource, *restful.WebService, []error) {
    var apiResources []metav1.APIResource
    var errors []error
    ws := a.newWebService()

    // Register the paths in a deterministic (sorted) order to get a deterministic swagger spec.
    paths := make([]string, len(a.group.Storage))
    var i int = 0
    for path := range a.group.Storage {
        paths[i] = path
        i++
    }
    sort.Strings(paths)
    for _, path := range paths {
        apiResource, err := a.registerResourceHandlers(path, a.group.Storage[path], ws)
        if err != nil {
            errors = append(errors, fmt.Errorf("error in registering resource: %s, %v", path, err))
        }
        if apiResource != nil {
            apiResources = append(apiResources, *apiResource)
        }
    }
    return apiResources, ws, errors
}

转到registerResourceHandlers k8s.io/apiserver/pkg/endpoints/installer.go -> registerResourceHandlers
为每个API创建相应的动作,包含不同的操作类型:POST、PUT、DELETE等

···
    actions = appendIf(actions, action{"LIST", resourcePath, resourceParams, namer, false}, isLister)
    actions = appendIf(actions, action{"POST", resourcePath, resourceParams, namer, false}, isCreater)
    actions = appendIf(actions, action{"DELETECOLLECTION", resourcePath, resourceParams, namer, false}, isCollectionDeleter)
    // DEPRECATED in 1.11
    actions = appendIf(actions, action{"WATCHLIST", "watch/" + resourcePath, resourceParams, namer, false}, allowWatchList)

    actions = appendIf(actions, action{"GET", itemPath, nameParams, namer, false}, isGetter)
    if getSubpath {
        actions = appendIf(actions, action{"GET", itemPath + "/{path:*}", proxyParams, namer, false}, isGetter)
    }
    actions = appendIf(actions, action{"PUT", itemPath, nameParams, namer, false}, isUpdater)
    actions = appendIf(actions, action{"PATCH", itemPath, nameParams, namer, false}, isPatcher)
    actions = appendIf(actions, action{"DELETE", itemPath, nameParams, namer, false}, isGracefulDeleter)
    // DEPRECATED in 1.11
    actions = appendIf(actions, action{"WATCH", "watch/" + itemPath, nameParams, namer, false}, isWatcher)
    actions = appendIf(actions, action{"CONNECT", itemPath, nameParams, namer, false}, isConnecter)
    actions = appendIf(actions, action{"CONNECT", itemPath + "/{path:*}", proxyParams, namer, false}, isConnecter && connectSubpath)
···
case "POST": // Create a resource.
    var handler restful.RouteFunction
    if isNamedCreater {
        handler = restfulCreateNamedResource(namedCreater, reqScope, admit)
    } else {
        handler = restfulCreateResource(creater, reqScope, admit)
    }
    handler = metrics.InstrumentRouteFunc(action.Verb, resource, subresource, requestScope, handler)
    article := getArticleForNoun(kind, " ")
    doc := "create" + article + kind
    if isSubresource {
        doc = "create " + subresource + " of" + article + kind
    }
    route := ws.POST(action.Path).To(handler).
        Doc(doc).
        Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
        Operation("create"+namespaced+kind+strings.Title(subresource)+operationSuffix).
        Produces(append(storageMeta.ProducesMIMETypes(action.Verb), mediaTypes...)...).
        Returns(http.StatusOK, "OK", producedObject).
        // TODO: in some cases, the API may return a v1.Status instead of the versioned object
        // but currently go-restful can't handle multiple different objects being returned.
        Returns(http.StatusCreated, "Created", producedObject).
        Returns(http.StatusAccepted, "Accepted", producedObject).
        Reads(defaultVersionedObject).
        Writes(producedObject)
    if err := addObjectParams(ws, route, versionedCreateOptions); err != nil {
        return nil, err
    }
    addParams(route, action.Params)
    routes = append(routes, route)

创建CreateKubeAPIServer服务。
CreateKubeAPIServer和createAPIExtensionsServer类似,只是CreateKubeAPIServer是内置的一些API服务。

转到CreateKubeAPIServer k8s.io/kubernetes/cmd/kube-apiserver/server.go -> CreateKubeAPIServer

// CreateKubeAPIServer creates and wires a workable kube-apiserver
func CreateKubeAPIServer(kubeAPIServerConfig *master.Config, delegateAPIServer genericapiserver.DelegationTarget, sharedInformers informers.SharedInformerFactory, versionedInformers clientgoinformers.SharedInformerFactory, admissionPostStartHook genericapiserver.PostStartHookFunc) (*master.Master, error) {
    kubeAPIServer, err := kubeAPIServerConfig.Complete(versionedInformers).New(delegateAPIServer)
    if err != nil {
        return nil, err
    }
//回勾处理
    kubeAPIServer.GenericAPIServer.AddPostStartHookOrDie("start-kube-apiserver-informers", func(context genericapiserver.PostStartHookContext) error {
        sharedInformers.Start(context.StopCh)
        return nil
    })
    kubeAPIServer.GenericAPIServer.AddPostStartHookOrDie("start-kube-apiserver-admission-initializer", admissionPostStartHook)

    return kubeAPIServer, nil
}

转到New k8s.io/kubernetes/pkg/master/master.go -> New
内部有两个关键性函数:InstallLegacyAPI和InstallAPIs。和createAPIExtensionsServer相比,把整体步骤分为这两部。

/ New returns a new instance of Master from the given config.
// Certain config fields will be set to a default value if unset.
// Certain config fields must be specified, including:
//   KubeletClientConfig
func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*Master, error) {
    ···
        m.InstallLegacyAPI(&c, c.GenericConfig.RESTOptionsGetter, legacyRESTStorageProvider)
    ···
    m.InstallAPIs(c.ExtraConfig.APIResourceConfigSource, c.GenericConfig.RESTOptionsGetter, restStorageProviders...)
    return m, nil
}

转到InstallLegacyAPI k8s.io/kubernetes/cmd/kube-apiserver/server.go -> InstallLegacyAPI
内部除了做一些钩子处理外,也分为两个重要函数:NewLegacyRESTStorage和InstallLegacyAPIGroup

func (m *Master) InstallLegacyAPI(c *completedConfig, restOptionsGetter generic.RESTOptionsGetter, legacyRESTStorageProvider corerest.LegacyRESTStorageProvider) {
    legacyRESTStorage, apiGroupInfo, err := legacyRESTStorageProvider.NewLegacyRESTStorage(restOptionsGetter)
    if err != nil {
        glog.Fatalf("Error building core storage: %v", err)
    }

    controllerName := "bootstrap-controller"
    coreClient := coreclient.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig)
    bootstrapController := c.NewBootstrapController(legacyRESTStorage, coreClient, coreClient, coreClient)
    m.GenericAPIServer.AddPostStartHookOrDie(controllerName, bootstrapController.PostStartHook)
    m.GenericAPIServer.AddPreShutdownHookOrDie(controllerName, bootstrapController.PreShutdownHook)

    if err := m.GenericAPIServer.InstallLegacyAPIGroup(genericapiserver.DefaultLegacyAPIPrefix, &apiGroupInfo); err != nil {
        glog.Fatalf("Error in registering group versions: %v", err)
    }
}

转到NewLegacyRESTStorage k8s.io/kubernetes/pkg/master/master.go -> NewLegacyRESTStorage

func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generic.RESTOptionsGetter) (LegacyRESTStorage, genericapiserver.APIGroupInfo, error) {
    //初始化APIGroupInfo的结构
    apiGroupInfo := genericapiserver.APIGroupInfo{
        PrioritizedVersions:          legacyscheme.Scheme.PrioritizedVersionsForGroup(""),
        VersionedResourcesStorageMap: map[string]map[string]rest.Storage{},
        Scheme:               legacyscheme.Scheme,
        ParameterCodec:       legacyscheme.ParameterCodec,
        NegotiatedSerializer: legacyscheme.Codecs,
    }
        //为资源创建对应的Storage
    restStorage := LegacyRESTStorage{}

    podTemplateStorage := podtemplatestore.NewREST(restOptionsGetter)

    eventStorage := eventstore.NewREST(restOptionsGetter, uint64(c.EventTTL.Seconds()))
    ···
      //建立路径与Storage的对应关系
    restStorageMap := map[string]rest.Storage{
        "pods":             podStorage.Pod,
        "pods/attach":      podStorage.Attach,
        "pods/status":      podStorage.Status,
        "pods/log":         podStorage.Log,
        "pods/exec":        podStorage.Exec,
        "pods/portforward": podStorage.PortForward,
        "pods/proxy":       podStorage.Proxy,
        "pods/binding":     podStorage.Binding,
        "bindings":         podStorage.Binding,

        "podTemplates": podTemplateStorage,

        "replicationControllers":        controllerStorage.Controller,
        "replicationControllers/status": controllerStorage.Status,

        "services":        serviceRest,
        "services/proxy":  serviceRestProxy,
        "services/status": serviceStatusStorage,

        "endpoints": endpointsStorage,

        "nodes":        nodeStorage.Node,
        "nodes/status": nodeStorage.Status,
        "nodes/proxy":  nodeStorage.Proxy,

        "events": eventStorage,

        "limitRanges":                   limitRangeStorage,
        "resourceQuotas":                resourceQuotaStorage,
        "resourceQuotas/status":         resourceQuotaStatusStorage,
        "namespaces":                    namespaceStorage,
        "namespaces/status":             namespaceStatusStorage,
        "namespaces/finalize":           namespaceFinalizeStorage,
        "secrets":                       secretStorage,
        "serviceAccounts":               serviceAccountStorage,
        "persistentVolumes":             persistentVolumeStorage,
        "persistentVolumes/status":      persistentVolumeStatusStorage,
        "persistentVolumeClaims":        persistentVolumeClaimStorage,
        "persistentVolumeClaims/status": persistentVolumeClaimStatusStorage,
        "configMaps":                    configMapStorage,

        "componentStatuses": componentstatus.NewStorage(componentStatusStorage{c.StorageFactory}.serversToValidate),
    }
    if legacyscheme.Scheme.IsVersionRegistered(schema.GroupVersion{Group: "autoscaling", Version: "v1"}) {
        restStorageMap["replicationControllers/scale"] = controllerStorage.Scale
    }
    if legacyscheme.Scheme.IsVersionRegistered(schema.GroupVersion{Group: "policy", Version: "v1beta1"}) {
        restStorageMap["pods/eviction"] = podStorage.Eviction
    }
    if serviceAccountStorage.Token != nil {
        restStorageMap["serviceaccounts/token"] = serviceAccountStorage.Token
    }
    apiGroupInfo.VersionedResourcesStorageMap["v1"] = restStorageMap

    return restStorage, apiGroupInfo, nil
}

转到InstallLegacyAPIGroup k8s.io/apiserver/pkg/server/genericapiserver.go -> InstallLegacyAPIGroup
函数内部的installAPIResources和上述创建Extension的步骤类似,主要也是同时go-restful将API暴露。故此处不再往下深入。

unc (s *GenericAPIServer) InstallLegacyAPIGroup(apiPrefix string, apiGroupInfo *APIGroupInfo) error {
    if !s.legacyAPIGroupPrefixes.Has(apiPrefix) {
        return fmt.Errorf("%q is not in the allowed legacy API prefixes: %v", apiPrefix, s.legacyAPIGroupPrefixes.List())
    }
    // 此处: apiPrefix 实际上 /api
    if err := s.installAPIResources(apiPrefix, apiGroupInfo); err != nil {
        return err
    }

    // Install the version handler.
    // Add a handler at /<apiPrefix> to enumerate the supported api versions.
    s.Handler.GoRestfulContainer.Add(discovery.NewLegacyRootAPIHandler(s.discoveryAddresses, s.Serializer, apiPrefix).WebService())

    return nil
}

再转到New中的InstallAPIs k8s.io/kubernetes/pkg/master/master.go
这里主要也是安装资源的API,只是这部分是deployments一类的扩展资源

// InstallAPIs will install the APIs for the restStorageProviders if they are enabled.
func (m *Master) InstallAPIs(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter, restStorageProviders ...RESTStorageProvider) {
    apiGroupsInfo := []genericapiserver.APIGroupInfo{}

    for _, restStorageBuilder := range restStorageProviders {
        groupName := restStorageBuilder.GroupName()
        if !apiResourceConfigSource.AnyVersionForGroupEnabled(groupName) {
            glog.V(1).Infof("Skipping disabled API group %q.", groupName)
            continue
        }
        apiGroupInfo, enabled := restStorageBuilder.NewRESTStorage(apiResourceConfigSource, restOptionsGetter)
        if !enabled {
            glog.Warningf("Problem initializing API group %q, skipping.", groupName)
            continue
        }
        glog.V(1).Infof("Enabling API group %q.", groupName)

        if postHookProvider, ok := restStorageBuilder.(genericapiserver.PostStartHookProvider); ok {
            name, hook, err := postHookProvider.PostStartHook()
            if err != nil {
                glog.Fatalf("Error building PostStartHook: %v", err)
            }
            m.GenericAPIServer.AddPostStartHookOrDie(name, hook)
        }

        apiGroupsInfo = append(apiGroupsInfo, apiGroupInfo)
    }

    for i := range apiGroupsInfo {
        if err := m.GenericAPIServer.InstallAPIGroup(&apiGroupsInfo[i]); err != nil {
            glog.Fatalf("Error in registering group versions: %v", err)
        }
    }
}

⑥创建createAggregatorConfig对象
转到createAggregatorConfig k8s.io/kubernetes/cmd/kube-apiserver/app/aggregator.go

func createAggregatorConfig(
    kubeAPIServerConfig genericapiserver.Config,
    commandOptions *options.ServerRunOptions,
    externalInformers kubeexternalinformers.SharedInformerFactory,
    serviceResolver aggregatorapiserver.ServiceResolver,
    proxyTransport *http.Transport,
    pluginInitializers []admission.PluginInitializer,
) (*aggregatorapiserver.Config, error) {
    // make a shallow copy to let us twiddle a few things
    // most of the config actually remains the same.  We only need to mess with a couple items related to the particulars of the aggregator
    //拷贝配置文件
    genericConfig := kubeAPIServerConfig
    //Etcd操作对象
    genericConfig.RESTOptionsGetter = &genericoptions.SimpleRestOptionsFactory{Options: etcdOptions}
···
    //aggregatorConfig 的配置文件
    aggregatorConfig := &aggregatorapiserver.Config{
        GenericConfig: &genericapiserver.RecommendedConfig{
            Config:                genericConfig,
            SharedInformerFactory: externalInformers,
        },
        ExtraConfig: aggregatorapiserver.ExtraConfig{
            ProxyClientCert: certBytes,
            ProxyClientKey:  keyBytes,
            ServiceResolver: serviceResolver,
            ProxyTransport:  proxyTransport,
        },
    }
    return aggregatorConfig, nil
}

⑦创建createAggregatorServer服务
AggregatorServer主要用于自定义的额聚合控制器的,使CRD能够自动注册到集群中
⑧根据配置创建非安全的处理器

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,992评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,212评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,535评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,197评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,310评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,383评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,409评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,191评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,621评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,910评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,084评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,763评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,403评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,083评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,318评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,946评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,967评论 2 351

推荐阅读更多精彩内容