APIServer的API 路由的实现

读APIServer 最好奇的是Aggregator到底是怎么转发的。资源是怎么管理的。这块代码很晦涩。我们可以理解一下。首先要理解k8s 的api 的分类
apiserver实际负责对外提供kubernetes RESTful API的服务,同时提供 https(默认监听6443端口)和http(默认监听8080端口)

它是系统管理指令的统一接口,任何对资源的增删该查都要交给apiserver处理后再交给etcd。

客户端通过list-watch监听apiserver中资源的create、update、delete事件,并针对事件类型调用相应的事件处理函数。

image

可以使用一个聚合器去聚合k8s的api server与用户开发的其它api server(如metrics-server等)

API Aggregation允许在不修改Kubernetes核心代码的同时扩展Kubernetes API

开启API Aggregation,需要在kube-apiserver增加部分配置

整个kubeAPIServer提供了三类API Resource接口:

  • core group:主要在 /api/v1 下, 这个是Default APIServer 提供的API 处理的
  • named groups:其 path 为 /apis/$GROUP/$VERSION APISever。如metrics-server。 通常管这类APIServer提供的API 叫做Aggregated API。这部分分为两种情况 一种情况是是内置的api ,例如extensionserver对应apis/apiextensions.k8s.io ,一种是外部定义的服务api。 这两种处理的方式不同,在后面会细讲。
  • 系统状态的一些 API:如 /metrics/version

而API的URL大致以 /apis/{group}/{version}/namespaces/{namespace}/resource/{name} 组成,结构如下图所示:

image.png

kubernetes的 Aggregated API是什么呢?它是允许k8s的开发人员编写一个自己的服务,可以把这个服务注册到k8s的api里面,这样,就像k8s自己的api一样,你的服务只要运行在k8s集群里面,k8s 的Aggregate通过service名称就可以转发到你写的service里面去了。
这个设计理念:
第一是增加了api的扩展性,这样k8s的开发人员就可以编写自己的API服务器来公开他们想要的API。集群管理员应该能够使用这些服务,而不需要对核心库存储库进行任何更改。
第二是丰富了APIs,核心kubernetes团队阻止了很多新的API提案。通过允许开发人员将他们的API作为单独的服务器公开,并使集群管理员能够在不对核心库存储库进行任何更改的情况下使用它们,这样就无须社区繁杂的审查了
第三是开发分阶段实验性API的地方,新的API可以在单独的聚集服务器中开发,当它稳定之后,那么把它们封装起来安装到其他集群就很容易了。
第四是确保新API遵循kubernetes约定:如果没有这里提出的机制,社区成员可能会被迫推出自己的东西,这可能会或可能不遵循kubernetes约定。

APIServices

APIServer 的类型都是APIServices ,但是k8s的api server会创建相应Local APIServices

kind: APIService
metadata:
  creationTimestamp: "2020-04-10T09:21:43Z"
  labels:
    kube-aggregator.kubernetes.io/automanaged: onstart
  name: v1.apps
  resourceVersion: "4"
  uid: 95cb5138-9100-4e1e-9568-de7d0d21389b
spec:
  group: apps
  groupPriorityMinimum: 17800
  version: v1
  versionPriority: 15
status:
  conditions:
  - lastTransitionTime: "2020-04-10T09:21:43Z"
    message: Local APIServices are always available
    reason: Local
    status: "True"
    type: Available

扩展Kubernetes API需要创建APIService资源对象,配置将哪些客户端请求代理到用户开发的api server
例如要访问metrics-server,则由群组/apis/metrics.k8s.io/v1beta1来获取

apiVersion: apiregistration.k8s.io/v1
kind: APIService
metadata:
  labels:
    k8s-app: metrics-server
  name: v1beta1.metrics.k8s.io
spec:
  group: metrics.k8s.io
  groupPriorityMinimum: 100
  insecureSkipTLSVerify: true
  service:
    name: metrics-server
    namespace: kube-system
    port: 443  # 默认为443
  version: v1beta1
  versionPriority: 100

为什么要有local 非local 之分,这里面其实要对go-restful 要有理解。下面讲的内容在后文会详细介绍。在server启动的时候会首先建立服务链。CreateServerChain: 创建3个server(APIExtensionsServer,KubeAPIServer,AggregatorServer),这3个server每一个struct中都包含一个GenericAPIServer以及其他信息,例如APIExtensionsServer包含Informers,KubeAPIServer包含ClusterAuthenticationInfo。GenericAPIServer有个字段是DelegationTarget, 字面意思是代理,3个Server通过该代理进行连接,连接关系为:AggregatorServer-->KubeAPIServer-->APIExtensionsServer,其中AggregatorServe为入口。

因此,在对K8s 原生的的服务链中, 从AggregatorServe-->KubeAPIServer-->APIExtensionsServer 是要走local restful 的代理调用链的,也就是内置api 是有代理调用的顺序的。而从AggregatorServer --> customized APIServer 就不能走原生的调用链了,因为没有办法侵入到k8s的源码。因此AggregatorServer 就需要支持两条路,一条是local 调用到KubeAPIServer 在从KubeAPIServer 到APIExtensionsServer,一条是remote调用到自定义的KubeAPIServer。

先介绍AggregatorServer到APIServer是如何运行的。

不管local 还是remote 都需要生成一个APIServices, remote 的APIServices 需要用户创建,同同时关联一个service 对象,local的service 有bootstrapcontroller 来创建,也会关联一个service 对象。 当service创建好时AggregatorServer的apiserviceRegistrationController 就会监听。

apiserviceRegistrationController负责根据APIService定义的aggregated server service构建代理,将CR的请求转发给后端的aggregated server。
apiService有两种类型:Local(Service为空)以及Service(Service非空)。

apiserviceRegistrationController负责对这两种类型apiService设置代理:

  • Local类型会直接路由给kube-apiserver进行处理;
  • 而Service类型则会设置代理并将请求转化为对aggregated Service的请求(proxyPath := "/apis/" + apiService.Spec.Group + "/" + apiService.Spec.Version)。
  • 请求的负载均衡策略则是优先本地访问kube-apiserver(如果service为kubernetes default apiserver service:443)=>通过service ClusterIP:Port访问(默认) 或者 通过随机选择service endpoint backend进行访问:
func (s *APIAggregator) AddAPIService(apiService *v1.APIService) error {
  ...
    proxyPath := "/apis/" + apiService.Spec.Group + "/" + apiService.Spec.Version
    // v1. is a special case for the legacy API.  It proxies to a wider set of endpoints.
    if apiService.Name == legacyAPIServiceName {
        proxyPath = "/api"
    }
    // register the proxy handler
    proxyHandler := &proxyHandler{
        localDelegate:   s.delegateHandler,
        proxyClientCert: s.proxyClientCert,
        proxyClientKey:  s.proxyClientKey,
        proxyTransport:  s.proxyTransport,
        serviceResolver: s.serviceResolver,
        egressSelector:  s.egressSelector,
    }
  ...
    s.proxyHandlers[apiService.Name] = proxyHandler
    s.GenericAPIServer.Handler.NonGoRestfulMux.Handle(proxyPath, proxyHandler)
    s.GenericAPIServer.Handler.NonGoRestfulMux.UnlistedHandlePrefix(proxyPath+"/", proxyHandler)
  ...
    // it's time to register the group aggregation endpoint
    groupPath := "/apis/" + apiService.Spec.Group
    groupDiscoveryHandler := &apiGroupHandler{
        codecs:    aggregatorscheme.Codecs,
        groupName: apiService.Spec.Group,
        lister:    s.lister,
        delegate:  s.delegateHandler,
    }
    // aggregation is protected
    s.GenericAPIServer.Handler.NonGoRestfulMux.Handle(groupPath, groupDiscoveryHandler)
    s.GenericAPIServer.Handler.NonGoRestfulMux.UnlistedHandle(groupPath+"/", groupDiscoveryHandler)
    s.handledGroups.Insert(apiService.Spec.Group)
    return nil
}
// k8s.io/kubernetes/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go:109
func (r *proxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
    // 加载roxyHandlingInfo处理请求  
    value := r.handlingInfo.Load()
    if value == nil {
        r.localDelegate.ServeHTTP(w, req)
        return
    }
    handlingInfo := value.(proxyHandlingInfo)
  ...
    // 判断APIService服务是否正常
    if !handlingInfo.serviceAvailable {
        proxyError(w, req, "service unavailable", http.StatusServiceUnavailable)
        return
    }
    // 将原始请求转化为对APIService的请求
    // write a new location based on the existing request pointed at the target service
    location := &url.URL{}
    location.Scheme = "https"
    rloc, err := r.serviceResolver.ResolveEndpoint(handlingInfo.serviceNamespace, handlingInfo.serviceName, handlingInfo.servicePort)
    if err != nil {
        klog.Errorf("error resolving %s/%s: %v", handlingInfo.serviceNamespace, handlingInfo.serviceName, err)
        proxyError(w, req, "service unavailable", http.StatusServiceUnavailable)
        return
    }
    location.Host = rloc.Host
    location.Path = req.URL.Path
    location.RawQuery = req.URL.Query().Encode()
    newReq, cancelFn := newRequestForProxy(location, req)
    defer cancelFn()
   ...
    proxyRoundTripper = transport.NewAuthProxyRoundTripper(user.GetName(), user.GetGroups(), user.GetExtra(), proxyRoundTripper)
    handler := proxy.NewUpgradeAwareHandler(location, proxyRoundTripper, true, upgrade, &responder{w: w})
    handler.ServeHTTP(w, newReq)
}

上面path是

proxyPath := "/apis/" + apiService.Spec.Group + "/" + apiService.Spec.Version 或者api
再看proxyhandler,当是local 时 走的localDelegate,而此时localDelegate实际上就是内置KubeAPIServer.GenericAPIServer

    if handlingInfo.local {
        if r.localDelegate == nil {
            http.Error(w, "", http.StatusNotFound)
            return
        }
        r.localDelegate.ServeHTTP(w, req)
        return
    }

当是非local时,才从service 对象解析一个endpoint进行转发。这下就解释了apiserver到底是怎么进行路由的。上面的updateAPIService就是更新这个proxy的后端service

func (r *proxyHandler) updateAPIService(apiService *apiregistrationapi.APIService) {
  if apiService.Spec.Service == nil {
      r.handlingInfo.Store(proxyHandlingInfo{local: true})
      return
  }

  newInfo := proxyHandlingInfo{
      restConfig: &restclient.Config{
          TLSClientConfig: restclient.TLSClientConfig{
              Insecure:   apiService.Spec.InsecureSkipTLSVerify,
              ServerName: apiService.Spec.Service.Name + "." + apiService.Spec.Service.Namespace + ".svc",
              CertData:   r.proxyClientCert,
              KeyData:    r.proxyClientKey,
              CAData:     apiService.Spec.CABundle,
          },
      },
      serviceName:      apiService.Spec.Service.Name,
      serviceNamespace: apiService.Spec.Service.Namespace,
  }
  newInfo.proxyRoundTripper, newInfo.transportBuildingError = restclient.TransportFor(newInfo.restConfig)
  if newInfo.transportBuildingError == nil && r.proxyTransport.Dial != nil {
      switch transport := newInfo.proxyRoundTripper.(type) {
      case *http.Transport:
          transport.Dial = r.proxyTransport.Dial
      default:
          newInfo.transportBuildingError = fmt.Errorf("unable to set dialer for %s/%s as rest transport is of type %T", apiService.Spec.Service.Namespace, apiService.Spec.Service.Name, newInfo.proxyRoundTripper)
          glog.Warning(newInfo.transportBuildingError.Error())
      }
  }
  r.handlingInfo.Store(newInfo)
}

这个restConfig就是调用service的客户端参数,其中
ServerName: apiService.Spec.Service.Name + "." + apiService.Spec.Service.Namespace + ".svc",就是具体的service。而上面watch service的变化就是为了动态更新这个apiservice后端handler所用的service。

上面章节解释了从aggregator 到 apiserver, 那么apiserver request 是如何转发到extension server 呢?
在kubernetes-master/cmd/kube-apiserver/app/server.go CreateServerChain方法中apiExtensionsServer先创建

    notFoundHandler := notfoundhandler.New(kubeAPIServerConfig.GenericConfig.Serializer, genericapifilters.NoMuxAndDiscoveryIncompleteKey)
    apiExtensionsServer, err := createAPIExtensionsServer(apiExtensionsConfig, genericapiserver.NewEmptyDelegateWithCustomHandler(notFoundHandler))
    if err != nil {
        return nil, err
    }

NewEmptyDelegateWithCustomHandler 创建了空的delegateTarget作为结尾

func NewEmptyDelegateWithCustomHandler(handler http.Handler) DelegationTarget {
    return emptyDelegate{handler}
}

而在创建kubeAPIServer时, kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, apiExtensionsServer.GenericAPIServer) apiExtensionsServer.GenericAPIServer作为delegateTarget传入

func CreateKubeAPIServer(kubeAPIServerConfig *controlplane.Config, delegateAPIServer genericapiserver.DelegationTarget) (*controlplane.Instance, error) {
    kubeAPIServer, err := kubeAPIServerConfig.Complete().New(delegateAPIServer)
    if err != nil {
        return nil, err
    }

    return kubeAPIServer, nil
}

在kubernetes-master/staging/src/k8s.io/apiserver/pkg/server/config.go的newnew New方法中, 在运行APIServerHandler时传入了delegationTarget.UnprotectedHandler()
···
apiServerHandler := NewAPIServerHandler(name, c.Serializer, handlerChainBuilder, delegationTarget.UnprotectedHandler())
···
这个方法就设定了delegateTarget 也就是extension server 的handler 是 NotFoundHandler

func NewAPIServerHandler(name string, s runtime.NegotiatedSerializer, handlerChainBuilder HandlerChainBuilderFn, notFoundHandler http.Handler) *APIServerHandler {
    nonGoRestfulMux := mux.NewPathRecorderMux(name)
    if notFoundHandler != nil {
        nonGoRestfulMux.NotFoundHandler(notFoundHandler)
    }
...
}

在apiServerHandler的ServeHTTP方法里可以看到,当都不match 的时候,就走入了notfoundhandler 的范围d.nonGoRestfulMux.ServeHTTP(w, req)

func (d director) ServeHTTP(w http.ResponseWriter, req *http.Request) {
    path := req.URL.Path

    // check to see if our webservices want to claim this path
    for _, ws := range d.goRestfulContainer.RegisteredWebServices() {
        switch {
        case ws.RootPath() == "/apis":
            // if we are exactly /apis or /apis/, then we need special handling in loop.
            // normally these are passed to the nonGoRestfulMux, but if discovery is enabled, it will go directly.
            // We can't rely on a prefix match since /apis matches everything (see the big comment on Director above)
            if path == "/apis" || path == "/apis/" {
                klog.V(5).Infof("%v: %v %q satisfied by gorestful with webservice %v", d.name, req.Method, path, ws.RootPath())
                // don't use servemux here because gorestful servemuxes get messed up when removing webservices
                // TODO fix gorestful, remove TPRs, or stop using gorestful
                d.goRestfulContainer.Dispatch(w, req)
                return
            }

        case strings.HasPrefix(path, ws.RootPath()):
            // ensure an exact match or a path boundary match
            if len(path) == len(ws.RootPath()) || path[len(ws.RootPath())] == '/' {
                klog.V(5).Infof("%v: %v %q satisfied by gorestful with webservice %v", d.name, req.Method, path, ws.RootPath())
                // don't use servemux here because gorestful servemuxes get messed up when removing webservices
                // TODO fix gorestful, remove TPRs, or stop using gorestful
                d.goRestfulContainer.Dispatch(w, req)
                return
            }
        }
    }

    // if we didn't find a match, then we just skip gorestful altogether
    klog.V(5).Infof("%v: %v %q satisfied by nonGoRestful", d.name, req.Method, path)
    d.nonGoRestfulMux.ServeHTTP(w, req)
}

从上文就解释了 如何从aggrator到apiserver再如何到extendsionserer 的路由过程。

附录

go-restful

go-restful是第三方的REST框架,在GitHub上有多个贡献者,采用了“路由”映射的设计思想,并且在API设计中使用了流行的Fluent Style风格,试用起来酣畅淋漓,也难怪Kubernetes选择了它。下面是go-restful的优良特性。

  • Ruby on Rails风格的Rest路由映射,例如/people/{person_id}/groups/{group_id}。
  • 大大简化了Rest API的开发工作。
  • 底层实现采用Golang的HTTP协议栈,几乎没有限制。
  • 拥有完整的单元包代码,很容易开发一个可测试的Rest API。
  • Google AppEngine ready。

go-restful框架中的核心对象如下:

  • restful.Container:代表了一个HTTP Rest服务器,包括一组restful.WebService对象和一个http.ServeMux对象,使用RouteSelector进行请求派发。
  • restful.WebService:标识一个Rest服务,由多个Rest路由(restful.Route)组成,这一组Rest路由共享同一个RootPath。
  • restful.Route:标识一个Rest路由,Rest路由主要由Rest Path、HTTP Method、输入输出类型(HTML/JSON)及对应的回调函数restful.RouteFunction组成。
  • restful.RouteFunction:一个用于处理具体的REST调用的函数接口定义,具体定义为type RouteFunction func(*Request, *Response)。

服务链

服务链的核心是DelegationTarget接口,它让API Server可以实现链式服务,当有HTTP请求到来时,优先让链首去处理URI,如果能够匹配成功就处理,否则交给下一链,一直到链尾。DelegationTarget的定义如下:

ype DelegationTarget interface {
    // UnprotectedHandler returns a handler that is NOT protected by a normal chain
    UnprotectedHandler() http.Handler
 
    // RequestContextMapper returns the existing RequestContextMapper.  Because we cannot rewire all existing
    // uses of this function, this will be used in any delegating API server
    RequestContextMapper() apirequest.RequestContextMapper
 
    // PostStartHooks returns the post-start hooks that need to be combined
    PostStartHooks() map[string]postStartHookEntry
 
    // PreShutdownHooks returns the pre-stop hooks that need to be combined
    PreShutdownHooks() map[string]preShutdownHookEntry
 
    // HealthzChecks returns the healthz checks that need to be combined
    HealthzChecks() []healthz.HealthzChecker
 
    // ListedPaths returns the paths for supporting an index
    ListedPaths() []string
 
    // NextDelegate returns the next delegationTarget in the chain of delegations
    NextDelegate() DelegationTarget
}

  • 它有一个空的实现emptyDelegate,一般作为链尾,由于是空的实现,所以具体的定义就不列举了。
  • 它的另一个实现是GenericAPIServer,如下所示:

type GenericAPIServer struct {
......
    // delegationTarget is the next delegate in the chain or nil
    delegationTarget DelegationTarget
    // HandlerChainWaitGroup allows you to wait for all chain handlers finish after the server shutdown.
    HandlerChainWaitGroup *utilwaitgroup.SafeWaitGroup

GenericAPIServer定义了一个delegationTarget成员,在API Server整套系统中,总共有三个服务,出了链尾,都是指向GenericAPIServer实例,该成员让GenericAPIServer实现了一套链的功能。

Bootstrapcontroller

bootstrap controller 的初始化以及启动是在 CreateKubeAPIServer 调用链的 InstallLegacyAPI 方法中完成的,bootstrap controller 的启停是由 apiserver 的 PostStartHook 和 ShutdownHook 进行控制的。
kubernetes-master/pkg/controlplane/instance.go

func (m *Master) InstallLegacyAPI(......) error {
    legacyRESTStorage, apiGroupInfo, err := legacyRESTStorageProvider.NewLegacyRESTStorage(restOptionsGetter)
    if err != nil {
        return fmt.Errorf("Error building core storage: %v", err)
    }

    // 初始化 bootstrap-controller
    controllerName := "bootstrap-controller"
    coreClient := corev1client.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig)
    bootstrapController := c.NewBootstrapController(......)
    m.GenericAPIServer.AddPostStartHookOrDie(controllerName, bootstrapController.PostStartHook)
    m.GenericAPIServer.AddPreShutdownHookOrDie(controllerName, bootstrapController.PreShutdownHook)

    if err := m.GenericAPIServer.InstallLegacyAPIGroup(genericapiserver.DefaultLegacyAPIPrefix, &apiGroupInfo); err != nil {
        return fmt.Errorf("Error in registering group versions: %v", err)
    }
    return nil
}

可以看具体逻辑在kubernetes-master/pkg/controlplane/controller.go 的start 方法为apiserver生成了service对象

func (c *Controller) Start() {
    if c.runner != nil {
        return
    }

    // Reconcile during first run removing itself until server is ready.
    endpointPorts := createEndpointPortSpec(c.PublicServicePort, "https", c.ExtraEndpointPorts)
    if err := c.EndpointReconciler.RemoveEndpoints(kubernetesServiceName, c.PublicIP, endpointPorts); err == nil {
        klog.Error("Found stale data, removed previous endpoints on kubernetes service, apiserver didn't exit successfully previously")
    } else if !storage.IsNotFound(err) {
        klog.Errorf("Error removing old endpoints from kubernetes service: %v", err)
    }

    repairClusterIPs := servicecontroller.NewRepair(c.ServiceClusterIPInterval, c.ServiceClient, c.EventClient, &c.ServiceClusterIPRange, c.ServiceClusterIPRegistry, &c.SecondaryServiceClusterIPRange, c.SecondaryServiceClusterIPRegistry)
    repairNodePorts := portallocatorcontroller.NewRepair(c.ServiceNodePortInterval, c.ServiceClient, c.EventClient, c.ServiceNodePortRange, c.ServiceNodePortRegistry)

    // run all of the controllers once prior to returning from Start.
    if err := repairClusterIPs.RunOnce(); err != nil {
        // If we fail to repair cluster IPs apiserver is useless. We should restart and retry.
        klog.Fatalf("Unable to perform initial IP allocation check: %v", err)
    }
    if err := repairNodePorts.RunOnce(); err != nil {
        // If we fail to repair node ports apiserver is useless. We should restart and retry.
        klog.Fatalf("Unable to perform initial service nodePort check: %v", err)
    }

    c.runner = async.NewRunner(c.RunKubernetesNamespaces, c.RunKubernetesService, repairClusterIPs.RunUntil, repairNodePorts.RunUntil)
    c.runner.Start()
}

// Stop cleans up this API Servers endpoint reconciliation leases so another master can take over more quickly.
func (c *Controller) Stop() {
    if c.runner != nil {
        c.runner.Stop()
    }
    endpointPorts := createEndpointPortSpec(c.PublicServicePort, "https", c.ExtraEndpointPorts)
    finishedReconciling := make(chan struct{})
    go func() {
        defer close(finishedReconciling)
        klog.Infof("Shutting down kubernetes service endpoint reconciler")
        c.EndpointReconciler.StopReconciling()
        if err := c.EndpointReconciler.RemoveEndpoints(kubernetesServiceName, c.PublicIP, endpointPorts); err != nil {
            klog.Errorf("Unable to remove endpoints from kubernetes service: %v", err)
        }
    }()

    select {
    case <-finishedReconciling:
        // done
    case <-time.After(2 * c.EndpointInterval):
        // don't block server shutdown forever if we can't reach etcd to remove ourselves
        klog.Warning("RemoveEndpoints() timed out")
    }
}

c.RunKubernetesNamespaces

c.RunKubernetesNamespaces 主要功能是创建 kube-system 和 kube-public 命名空间,如果启用了 NodeLease 特性功能还会创建 kube-node-lease 命名空间,之后每隔一分钟检查一次。

k8s.io/kubernetes/pkg/master/controller.go:199

func (c *Controller) RunKubernetesNamespaces(ch chan struct{}) {
    wait.Until(func() {
        for _, ns := range c.SystemNamespaces {
            if err := createNamespaceIfNeeded(c.NamespaceClient, ns); err != nil {
                runtime.HandleError(fmt.Errorf("unable to create required kubernetes system namespace %s: %v", ns, err))
            }
        }
    }, c.SystemNamespacesInterval, ch)
}
c.RunKubernetesService
c.RunKubernetesService 主要是检查 kubernetes service 是否处于正常状态,并定期执行同步操作。首先调用 /healthz 接口检查 apiserver 当前是否处于 ready 状态,若处于 ready 状态然后调用 c.UpdateKubernetesService 服务更新 kubernetes service 状态。

k8s.io/kubernetes/pkg/master/controller.go:210

func (c *Controller) RunKubernetesService(ch chan struct{}) {
    wait.PollImmediateUntil(100*time.Millisecond, func() (bool, error) {
        var code int
        c.healthClient.Get().AbsPath("/healthz").Do().StatusCode(&code)
        return code == http.StatusOK, nil
    }, ch)

    wait.NonSlidingUntil(func() {
        if err := c.UpdateKubernetesService(false); err != nil {
            runtime.HandleError(fmt.Errorf("unable to sync kubernetes service: %v", err))
        }
    }, c.EndpointInterval, ch)
}
c.UpdateKubernetesService
c.UpdateKubernetesService 的主要逻辑为:
  • 1、调用 createNamespaceIfNeeded 创建 default namespace;
  • 2、调用 c.CreateOrUpdateMasterServiceIfNeeded 为 master 创建 kubernetes service;
  • 3、调用 c.EndpointReconciler.ReconcileEndpoints 更新 master 的 endpoint;

k8s.io/kubernetes/pkg/master/controller.go:230

func (c *Controller) UpdateKubernetesService(reconcile bool) error {
    if err := createNamespaceIfNeeded(c.NamespaceClient, metav1.NamespaceDefault); err != nil {
        return err
    }

    servicePorts, serviceType := createPortAndServiceSpec(c.ServicePort, c.PublicServicePort, c.KubernetesServiceNodePort, "https", c.ExtraServicePorts)
    if err := c.CreateOrUpdateMasterServiceIfNeeded(kubernetesServiceName, c.ServiceIP, servicePorts, serviceType, reconcile); err != nil {
            return err
    }
    endpointPorts := createEndpointPortSpec(c.PublicServicePort, "https", c.ExtraEndpointPorts)
    if err := c.EndpointReconciler.ReconcileEndpoints(kubernetesServiceName, c.PublicIP, endpointPorts, reconcile); err != nil {
        return err
    }
    return nil
}

c.EndpointReconciler.ReconcileEndpoints

EndpointReconciler 的具体实现由 EndpointReconcilerType 决定,EndpointReconcilerType 是 --endpoint-reconciler-type 参数指定的,可选的参数有 master-count, lease, none,每种类型对应不同的 EndpointReconciler 实例,在 v1.16 中默认为 lease,此处仅分析 lease 对应的 EndpointReconciler 的实现。

一个集群中可能会有多个 apiserver 实例,因此需要统一管理 apiserver service 的 endpoints,c.EndpointReconciler.ReconcileEndpoints 就是用来管理 apiserver endpoints 的。一个集群中 apiserver 的所有实例会在 etcd 中的对应目录下创建 key,并定期更新这个 key 来上报自己的心跳信息,ReconcileEndpoints 会从 etcd 中获取 apiserver 的实例信息并更新 endpoint。

k8s.io/kubernetes/pkg/master/reconcilers/lease.go:144

func (r *leaseEndpointReconciler) ReconcileEndpoints(......) error {
    r.reconcilingLock.Lock()
    defer r.reconcilingLock.Unlock()

    if r.stopReconcilingCalled {
        return nil
    }

    // 更新 lease 信息
    if err := r.masterLeases.UpdateLease(ip.String()); err != nil {
        return err
    }

    return r.doReconcile(serviceName, endpointPorts, reconcilePorts)
}
func (r *leaseEndpointReconciler) doReconcile(......) error {
    // 1、获取 master 的 endpoint
    e, err := r.epAdapter.Get(corev1.NamespaceDefault, serviceName, metav1.GetOptions{})
    shouldCreate := false
    if err != nil {
        if !errors.IsNotFound(err) {
            return err
        }

        shouldCreate = true
        e = &corev1.Endpoints{
            ObjectMeta: metav1.ObjectMeta{
                Name:      serviceName,
                Namespace: corev1.NamespaceDefault,
            },
        }
    }

    // 2、从 etcd 中获取所有的 master
    masterIPs, err := r.masterLeases.ListLeases()
    if err != nil {
        return err
    }

    if len(masterIPs) == 0 {
        return fmt.Errorf("no master IPs were listed in storage, refusing to erase all endpoints for the kubernetes service")
    }

    // 3、检查 endpoint 中 master 信息,如果与 etcd 中的不一致则进行更新
    formatCorrect, ipCorrect, portsCorrect := checkEndpointSubsetFormatWithLease(e, masterIPs, endpointPorts, reconcilePorts)
    if formatCorrect && ipCorrect && portsCorrect {
        return nil
    }

    if !formatCorrect {
        e.Subsets = []corev1.EndpointSubset{{
            Addresses: []corev1.EndpointAddress{},
            Ports:     endpointPorts,
        }}
    }
    if !formatCorrect || !ipCorrect {
        e.Subsets[0].Addresses = make([]corev1.EndpointAddress, len(masterIPs))
        for ind, ip := range masterIPs {
            e.Subsets[0].Addresses[ind] = corev1.EndpointAddress{IP: ip}
        }

        e.Subsets = endpointsv1.RepackSubsets(e.Subsets)
    }

    if !portsCorrect {
        e.Subsets[0].Ports = endpointPorts
    }

    if shouldCreate {
        if _, err = r.epAdapter.Create(corev1.NamespaceDefault, e); errors.IsAlreadyExists(err) {
            err = nil
        }
    } else {
        _, err = r.epAdapter.Update(corev1.NamespaceDefault, e)
    }
    return err
}

repairClusterIPs.RunUntil

repairClusterIP 主要解决的问题有:

  • 保证集群中所有的 ClusterIP 都是唯一分配的;
  • 保证分配的 ClusterIP 不会超出指定范围;
  • 确保已经分配给 service 但是因为 crash 等其他原因没有正确创建 ClusterIP;
  • 自动将旧版本的 Kubernetes services 迁移到 ipallocator 原子性模型;

repairClusterIPs.RunUntil其实是调用 repairClusterIPs.runOnce 来处理的,其代码中的主要逻辑如下所示:

k8s.io/kubernetes/pkg/registry/core/service/ipallocator/controller/repair.go:134

func (c *Repair) runOnce() error {
    ......

    // 1、首先从 etcd 中获取已经使用 ClusterIP 的快照
    err = wait.PollImmediate(time.Second, 10*time.Second, func() (bool, error) {
        var err error
        snapshot, err = c.alloc.Get()
        if err != nil {
            return false, err
        }

        if c.shouldWorkOnSecondary() {
            secondarySnapshot, err = c.secondaryAlloc.Get()
            if err != nil {
                return false, err
            }
        }
        return true, nil
    })
    if err != nil {
        return fmt.Errorf("unable to refresh the service IP block: %v", err)
    }
    // 2、判断 snapshot 是否已经初始化
    if snapshot.Range == "" {
        snapshot.Range = c.network.String()
    }

    if c.shouldWorkOnSecondary() && secondarySnapshot.Range == "" {
        secondarySnapshot.Range = c.secondaryNetwork.String()
    }

    stored, err = ipallocator.NewFromSnapshot(snapshot)
    if c.shouldWorkOnSecondary() {
        secondaryStored, secondaryErr = ipallocator.NewFromSnapshot(secondarySnapshot)
    }

    if err != nil || secondaryErr != nil {
        return fmt.Errorf("unable to rebuild allocator from snapshots: %v", err)
    }
    // 3、获取 service list
    list, err := c.serviceClient.Services(metav1.NamespaceAll).List(metav1.ListOptions{})
    if err != nil {
        return fmt.Errorf("unable to refresh the service IP block: %v", err)
    }

    // 4、将 CIDR 转换为对应的 IP range 格式
    var rebuilt, secondaryRebuilt *ipallocator.Range
    rebuilt, err = ipallocator.NewCIDRRange(c.network)

    ......

    // 5、检查每个 Service 的 ClusterIP,保证其处于正常状态
    for _, svc := range list.Items {
        if !helper.IsServiceIPSet(&svc) {
            continue
        }
        ip := net.ParseIP(svc.Spec.ClusterIP)
        ......

        actualAlloc := c.selectAllocForIP(ip, rebuilt, secondaryRebuilt)
        switch err := actualAlloc.Allocate(ip); err {
        // 6、检查 ip 是否泄漏
        case nil:
            actualStored := c.selectAllocForIP(ip, stored, secondaryStored)
            if actualStored.Has(ip) {
                actualStored.Release(ip)
            } else {
                ......
            }
            delete(c.leaks, ip.String())
        // 7、ip 重复分配
        case ipallocator.ErrAllocated:
            ......
        // 8、ip 超出范围
        case err.(*ipallocator.ErrNotInRange):
            ......
        // 9、ip 已经分配完
        case ipallocator.ErrFull:
            ......
        default:
            ......
        }
    }
    // 10、对比是否有泄漏 ip
    c.checkLeaked(stored, rebuilt)
    if c.shouldWorkOnSecondary() {
        c.checkLeaked(secondaryStored, secondaryRebuilt)
    }

    // 11、更新快照
    err = c.saveSnapShot(rebuilt, c.alloc, snapshot)
    if err != nil {
        return err
    }

    if c.shouldWorkOnSecondary() {
        err := c.saveSnapShot(secondaryRebuilt, c.secondaryAlloc, secondarySnapshot)
        if err != nil {
            return nil
        }
    }
    return nil
}

repairNodePorts.RunUnti

repairNodePorts 主要是用来纠正 service 中 nodePort 的信息,保证所有的 ports 都基于 cluster 创建的,当没有与 cluster 同步时会触发告警,其最终是调用 repairNodePorts.runOnce 进行处理的,主要逻辑与 ClusterIP 的处理逻辑类似。

k8s.io/kubernetes/pkg/registry/core/service/portallocator/controller/repair.go:84

func (c *Repair) runOnce() error {
    // 1、首先从 etcd 中获取已使用 nodeport 的快照
    err := wait.PollImmediate(time.Second, 10*time.Second, func() (bool, error) {
        var err error
        snapshot, err = c.alloc.Get()
        return err == nil, err
    })
    if err != nil {
        return fmt.Errorf("unable to refresh the port allocations: %v", err)
    }
    // 2、检查 snapshot 是否初始化
    if snapshot.Range == "" {
        snapshot.Range = c.portRange.String()
    }
    // 3、获取已分配 nodePort 信息
    stored, err := portallocator.NewFromSnapshot(snapshot)
    if err != nil {
        return fmt.Errorf("unable to rebuild allocator from snapshot: %v", err)
    }
    // 4、获取 service list
    list, err := c.serviceClient.Services(metav1.NamespaceAll).List(metav1.ListOptions{})
    if err != nil {
        return fmt.Errorf("unable to refresh the port block: %v", err)
    }

    rebuilt, err := portallocator.NewPortAllocator(c.portRange)
    if err != nil {
        return fmt.Errorf("unable to create port allocator: %v", err)
    }

    // 5、检查每个 Service ClusterIP 的 port,保证其处于正常状态
    for i := range list.Items {
        svc := &list.Items[i]
        ports := collectServiceNodePorts(svc)
        if len(ports) == 0 {
            continue
        }
        for _, port := range ports {
            switch err := rebuilt.Allocate(port); err {
            // 6、检查 port 是否泄漏
            case nil:
                if stored.Has(port) {
                    stored.Release(port)
                } else {
                    ......
                }
                delete(c.leaks, port)
            // 7、port 重复分配
            case portallocator.ErrAllocated:
                ......
            // 8、port 超出分配范围
            case err.(*portallocator.ErrNotInRange):
                ......
            // 9、port 已经分配完
            case portallocator.ErrFull:
                ......
            default:
                ......
            }
        }
    }
    // 10、检查 port 是否泄漏
    stored.ForEach(func(port int) {
        count, found := c.leaks[port]
        switch {
        case !found:
            ......
            count = numRepairsBeforeLeakCleanup - 1
            fallthrough
        case count > 0:
            c.leaks[port] = count - 1
            if err := rebuilt.Allocate(port); err != nil {
                runtime.HandleError(fmt.Errorf("the node port %d may have leaked, but can not be allocated: %v", port, err))
            }
        default:
            ......
        }
    })

    // 11、更新 snapshot
    if err := rebuilt.Snapshot(snapshot); err != nil {
        return fmt.Errorf("unable to snapshot the updated port allocations: %v", err)
    }
    ......
    return nil
}
```
以上就是 bootstrap controller 的主要实现
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,287评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,346评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,277评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,132评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,147评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,106评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,019评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,862评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,301评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,521评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,682评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,405评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,996评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,651评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,803评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,674评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,563评论 2 352

推荐阅读更多精彩内容