kube-apiserver进程的入口类源码位置如下:\kubernetes-master\cmd\kube-apiserver\apiserver.go
Apiserver通过Run方法启动, 主要逻辑为:
- 调用CreateServerChain构建服务调用链并判断是否启动非安全的httpserver,httpserver链中包含 apiserver要启动的三个server,以及为每个server注册对应资源的路由;
- 调用server.PrepareRun进行服务运行前的准备,该方法主要完成了健康检查. 存活检查和OpenAPI路由的注册工作;
- 调用prepared.Run启动server;
apiserver是K8S最重要的组成部分,不论是命令操作还是通过remote API进行控制,实际都需要经过apiserver。
apiserver是k8s系统中所有对象的增删改查盯的http/restful式服务端,其中盯是指watch操作。数据最终存储在分布式一致的etcd存储内,apiserver本身是无状态的,提供了这些数据访问的认证鉴权、缓存、api版本适配转换等一系列的功能。
Run(runOptions *options.ServerRunOptions, stopCh <-chan struct{})
|
|————1) CreateServerChain(runOptions *options.ServerRunOptions, stopCh <-chan struct{}) (*genericapiserver.GenericAPIServer, error)
| |
| |————1) CreateKubeAPIServerConfig(*options.ServerRunOptions, tunneler.Tunneler, *http.Transport) (*master.Config, informers.SharedInformerFactory, clientgoinformers.SharedInformerFactory, *kubeserver.InsecureServingInfo, aggregatorapiserver.ServiceResolver, error)
| | |
| | |——————1) defaultOptions(s *options.ServerRunOptions)
| | |——————2) Validate() []error
| | |——————3) BuildGenericConfig(s *options.ServerRunOptions, *http.Transport) (*genericapiserver.Config, informers.SharedInformerFactory, clientgoinformers.SharedInformerFactory, *kubeserver.InsecureServingInfo, aggregatorapiserver.ServiceResolver, error)
| | | |
| | | |————1) NewConfig(codecs serializer.CodecFactory) *Config :包含DefaultBuildHandlerChain
| | | |————2) genericapiserver.DefaultOpenAPIConfig(generatedopenapi.GetOpenAPIDefinitions, legacyscheme.Scheme)
| | | |————3) genericapiserver.DefaultSwaggerConfig()
| | | |————4) BuildStorageFactory(s *options.ServerRunOptions) (*serverstorage.DefaultStorageFactory, error)
| | | |————5) s.Etcd.ApplyWithStorageFactoryTo(storageFactory, genericConfig)
| | | |————6) BuildAuthenticator
| | | |————7) BuildAuthorizer
| | | |————8) BuildAdmissionPluginInitializers
| | |
| | |——————4) &master.Config=genericapiserver.Config+ExtraConfig
| |
| |————2) createAPIExtensionsConfig(genericapiserver.Config, kubeexternalinformers.SharedInformerFactory, *options.ServerRunOptions) (*apiextensionsapiserver.Config, error):设置RESTOptionsGetter
| |————3) createAPIExtensionsServer( *apiextensionsapiserver.Config, genericapiserver.DelegationTarget) (*apiextensionsapiserver.CustomResourceDefinitions, error)
| | |
| | |————1) apiextensionsConfig.Complete():CompletedConfig
| | | |————(c *RecommendedConfig) Complete() CompletedConfig :cfg.GenericConfig.Complete()
| | | |————(c *Config) Complete(informers informers.SharedInformerFactory) CompletedConfig: c.Config.Complete(c.SharedInformerFactory)
| | |————2) (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget):*CustomResourceDefinitions
| | |————1) c.GenericConfig.New("apiextensions-apiserver", delegationTarget):*GenericAPIServer
| | | |————1) NewAPIServerHandler(name string, request.RequestContextMapper, runtime.NegotiatedSerializer, HandlerChainBuilderFn, notFoundHandler http.Handler) *APIServerHandler
| | | | |————1) nonGoRestfulMux := genericmux.NewPathRecorderMux(name)
| | | | |————2) gorestfulContainer := restful.NewContainer()
| | | |————2) &GenericAPIServer
| | | |————3) installAPI(s *GenericAPIServer, c *Config) //第一次装载
| | | |———— 1) routes.Index{}.Install(s.listedPathProvider, s.Handler.NonGoRestfulMux)
| | | |———— 2) routes.SwaggerUI{}.Install(s.Handler.NonGoRestfulMux)
| | | |———— 3) routes.Profiling{}.Install(s.Handler.NonGoRestfulMux)
| | | |———— 4) routes.MetricsWithReset{}.Install(s.Handler.NonGoRestfulMux)
| | | |———— 5) routes.DefaultMetrics{}.Install(s.Handler.NonGoRestfulMux)
| | | |———— 6) routes.Version{Version: c.Version}.Install(s.Handler.GoRestfulContainer)
| | | |———— 7) s.Handler.GoRestfulContainer.Add(s.DiscoveryGroupManager.WebService())
| | |————2) genericapiserver.NewDefaultAPIGroupInfo
| | |————3) apiGroupInfo里设置VersionedResourcesStorageMap,NewREST(scheme *runtime.Scheme, optsGetter generic.RESTOptionsGetter) *REST
| | |————4) s.InstallAPIGroup(apiGroupInfo *APIGroupInfo)
| | | |———— (*GenericAPIServer) installAPIResources(apiPrefix string, apiGroupInfo *APIGroupInfo)
| | | |———— (g *APIGroupVersion) InstallREST(container *restful.Container)
| | | |———— (a *APIInstaller) Install() ([]metav1.APIResource, *restful.WebService, []error)
| | | |———— (a *APIInstaller) newWebService() *restful.WebService
| | | |———— 最终的API注册过程是在这个函数中完成的,把一个rest.Storage对象转换为实际的getter, lister等处理函数,并和实际的url关联起来。
| | | |———— (a *APIInstaller) registerResourceHandlers(path string, rest.Storage, *restful.WebService, proxyHandler http.Handler) (*metav1.APIResource, error)
| | |————5) s.GenericAPIServer.Handler.NonGoRestfulMux.Handle("/apis", crdHandler)
| | |————6) s.GenericAPIServer.Handler.NonGoRestfulMux.HandlePrefix("/apis/", crdHandler)
| |————4) CreateKubeAPIServer(*master.Config, genericapiserver.DelegationTarget, informers.SharedInformerFactory, clientgoinformers.SharedInformerFactory) (*master.Master, error)
| | |————1) kubeAPIServerConfig.Complete(versionedInformers):master.CompletedConfig:cfg.GenericConfig.Complete(informers)
| | | |————(c *Config) Complete(informers informers.SharedInformerFactory) CompletedConfig: c.Config.Complete(c.SharedInformerFactory)
| | | //第二次装载 功能性的、主要的api是在m.InstallAPIs()中装载的。
| | |————2) (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget):*Master
| | |————1) c.GenericConfig.New("kube-apiserver", delegationTarget)
| | |————2) (m *Master) InstallLegacyAPI(c *completedConfig, generic.RESTOptionsGetter, corerest.LegacyRESTStorageProvider)
| | | |————1) (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generic.RESTOptionsGetter) (LegacyRESTStorage, genericapiserver.APIGroupInfo, error)
| | | | |———— nodestore.NewStorage(restOptionsGetter, c.KubeletClientConfig, c.ProxyTransport): //func NewStorage(...就是controller创建
| | | | |———— (e *Store) CompleteWithOptions(options *generic.StoreOptions)
| | | | |———— (f *storageFactoryRestOptionsFactory) GetRESTOptions(resource schema.GroupResource):generic.RESTOptions
| | | | |———— ret.Decorator = genericregistry.StorageWithCacher(cacheSize)
| | | | |———— 1) s, d := generic.NewRawStorage(storageConfig)
| | | | | |———— s, d, err := factory.Create(*config)
| | | | | |———— newETCD2Storage(c)
| | | | | |———— newETCD3Storage(c)
| | | | |———— 2) cacher := storage.NewCacherFromConfig(cacherConfig)
| | | |————2) (s *GenericAPIServer) InstallLegacyAPIGroup(apiPrefix string, apiGroupInfo *APIGroupInfo)
| | |————3) (m *Master) InstallAPIs(serverstorage.APIResourceConfigSource, generic.RESTOptionsGetter, restStorageProviders ...RESTStorageProvider)
| | |———— (s *GenericAPIServer) InstallAPIGroup(apiGroupInfo *APIGroupInfo)
| |————5) kubeAPIServer.GenericAPIServer.PrepareRun()
| | |————1) routes.Swagger{Config: s.swaggerConfig}.Install(s.Handler.GoRestfulContainer)
| | |————2) routes.OpenAPI{Config: s.openAPIConfig,}.Install(s.Handler.GoRestfulContainer, s.Handler.NonGoRestfulMux)
| | |————3) s.installHealthz()
| |————6) apiExtensionsServer.GenericAPIServer.PrepareRun()
| |————7) createAggregatorConfig(genericapiserver.Config, *options.ServerRunOptions, kubeexternalinformers.SharedInformerFactory, aggregatorapiserver.ServiceResolver, *http.Transport) (*aggregatorapiserver.Config, error)
| |————8) createAggregatorServer(*aggregatorapiserver.Config, genericapiserver.DelegationTarget, apiextensionsinformers.SharedInformerFactory) (*aggregatorapiserver.APIAggregator, error)
| | |————1) aggregatorConfig.Complete().NewWithDelegate(delegateAPIServer):*APIAggregator
| | |———— c.GenericConfig.New("kube-aggregator", delegationTarget)
| |————9) BuildInsecureHandlerChain(apiHandler http.Handler, c *server.Config) http.Handler
| |————10) NonBlockingRun( *InsecureServingInfo, insecureHandler http.Handler, shutDownTimeout time.Duration, stopCh <-chan struct{}) error
| |————serveInsecurely(insecureServingInfo *InsecureServingInfo, insecureHandler http.Handler, shutDownTimeout time.Duration, stopCh <-chan struct{})
| |————server.RunServer(insecureServer, ln, shutDownTimeout, stopCh)
|
|————2) (s *GenericAPIServer) PrepareRun() preparedGenericAPIServer
|
|————3) (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error
|————(s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{})
|———— (s *GenericAPIServer) serveSecurely(stopCh <-chan struct{}) 设置APIServerHandler为http处理函数
|———— RunServer(secureServer, s.SecureServingInfo.Listener, s.ShutdownTimeout, stopCh)
关键结构:
ServerRunOptions结构:
路径: cmd/kube-apiserver/app/options/options.go
type ServerRunOptions struct {
// 重名,下面称为GenericServerRunOptions
GenericServerRunOptions *genericoptions.ServerRunOptions // 服务器通用的运行参数
AllowPrivileged bool // 是否配置超级权限,即允许Pod中运行的容器拥有系统特权
EventTTL time.Duration // 事件留存事件, 默认1h
KubeletConfig kubeletclient.KubeletClientConfig // K8S kubelet配置
MaxConnectionBytesPerSec int64 // 每秒的最大连接数
// 指定的话,可以通过SSH指定的秘钥文件和用户名对Node进行访问
SSHKeyfile string
SSHUser string
// 包含PEM-encoded x509 RSA公钥和私钥的文件路径,用于验证Service Account的token
// 不指定的话,则使用--tls-private-key-file指定的文件
ServiceAccountKeyFile string
// 设置为true时,系统会到etcd验证ServiceAccount token是否存在
ServiceAccountLookup bool
WebhookTokenAuthnConfigFile string
WebhookTokenAuthnCacheTTL time.Duration
}
ServerRunOptions结构:
路径: pkg/genericapiserver/options/server_run_options.go
type ServerRunOptions struct {
// 准入控制,如:"AlwaysAdmit","LimitRanger","ReousrceQuota"等
AdmissionControl string
// 准入控制的配置文件
AdmissionControlConfigFile string
// 用于广播给集群的所有成员自己的IP地址,不指定的话就使用"--bind-address"的IP地址
AdvertiseAddress net.IP
// 安全访问的认证模式列表,以逗号分隔,包括:AlwaysAllow、AlwaysDeny、ABAC、Webhook、RBAC
AuthorizationMode string
// mode设置为ABAC时使用的csv格式的授权配置文件
AuthorizationPolicyFile string
// 下列跟mode配置成webhook有关
AuthorizationWebhookConfigFile string
AuthorizationWebhookCacheAuthorizedTTL time.Duration
AuthorizationWebhookCacheUnauthorizedTTL time.Duration
// mode设置为RBAC时使用的超级用户名,用该用户名进行RBAC认证
AuthorizationRBACSuperUser string
AnonymousAuth bool
// 使用http基本认证的方式访问API Server的安全端口
BasicAuthFile string
// 默认"0.0.0.0",apiServer在该地址的6443端口上开启https服务
BindAddress net.IP
// TLS证书所在目录,默认"/var/run/kubernetes"
CertDirectory string
// 指定的话,该客户端证书将用于认证过程
ClientCAFile string
// 下列的云服务商有关
CloudConfigFile string
CloudProvider string
// CORS 跨域资源共享
CorsAllowedOriginList []string
// 默认的持久化存储格式,比如"application/json"
DefaultStorageMediaType string
// 指定清理的工作线程数,可以提高清理namespace的效率,但是会增加系统资源的占用
DeleteCollectionWorkers int
// 日志相关策略
AuditLogPath string
AuditLogMaxAge int
AuditLogMaxBackups int
AuditLogMaxSize int
// 使能GC
EnableGarbageCollection bool
// 打开性能分析,可以通过<host>:<port>/debug/pprof/地址来查看程序栈,线程等信息
EnableProfiling bool
EnableContentionProfiling bool
// 使能swaggerUI,访问地址<host>:<port>/swagger-ui
EnableSwaggerUI bool
// 使能watch cache,对所有的watch操作进行缓存
EnableWatchCache bool
// 按资源覆盖etcd服务的设置,以逗号分隔,比如group/resource#servers,其中servers为: http://ip:port
EtcdServersOverrides []string
StorageConfig storagebackend.Config
// 用于生成该master对外的URL地址
ExternalHost string
// 绑定的不安全地址,即8080端口绑定的地址
InsecureBindAddress net.IP
// 非安全端口,默认8080
InsecurePort int
// 设置keystone鉴权插件地址
KeystoneURL string
KeystoneCAFile string
KubernetesServiceNodePort int
LongRunningRequestRE string
// master数量
MasterCount int
// 设置master服务所在的namespace,默认为default
MasterServiceNamespace string
// 同时处理的最大请求数,默认为400,超过该请求数将被拒绝。仅用于长时间执行的请求
MaxRequestsInFlight int
// 最小请求处理超时时间,默认1800s,仅用于watch request
MinRequestTimeout int
// 该文件内设置鉴权机构
OIDCCAFile string
OIDCClientID string
OIDCIssuerURL string
OIDCUsernameClaim string
OIDCGroupsClaim string
RequestHeaderUsernameHeaders []string
RequestHeaderClientCAFile string
RequestHeaderAllowedNames []string
// 一组key=value用于运行时的配置信息。api/<groupVersion>/<resource>,用于打开或者关闭对某个API版本的支持
// api/all和api/legacy特别用于支持所有版本的API或支持旧版本的API
RuntimeConfig config.ConfigurationMap
// https安全端口,默认6443;设置为0,表示不开启https
SecurePort int
// service的Cluster IP池
ServiceClusterIPRange net.IPNet // TODO: make this a list
// service的NodePort模式下能使用的主机端口号范围,默认是30000--32767
ServiceNodePortRange utilnet.PortRange
// 持久化存储的资源版本号,例如"group1/version1,group2/version2,..."
StorageVersions string
// The default values for StorageVersions. StorageVersions overrides
// these; you can change this if you want to change the defaults (e.g.,
// for testing). This is not actually exposed as a flag.
DefaultStorageVersions string
TargetRAMMB int
// TLS CA文件
TLSCAFile string
// 包含x509证书的文件路径,用于https认证
TLSCertFile string
// 包含x509与tls-cert-file对应的私钥文件路径
TLSPrivateKeyFile string
SNICertKeys []config.NamedCertKey
// 用于访问APIServer安全端口的token认证文件路径
TokenAuthFile string
// 使能token
EnableAnyToken bool
// 设置各资源对象watch缓存大小的列表,以逗号分隔,格式为resource#size
// 前提是EnableWatchCache为true
WatchCacheSizes []string
}
ApiServer启动
路径:kubernetes/cmd/kube-apiserver/apiserver.go
入口main()函数:
func main() {
rand.Seed(time.Now().UTC().UnixNano())
// 新建一个apiserver对象
s := options.NewServerRunOptions()
// 接受用户命令行输入,其实就是自定义上述apiserver对象
s.AddFlags(pflag.CommandLine)
// 解析并格式化用户传入的参数,最后填充APIServer结构体的各成员
flag.InitFlags()
// 初始化log配置,包括log输出位置、log等级等。
logs.InitLogs()
// 保证了即使apiserver异常崩溃了也能将内存中的log信息保存到磁盘文件中。
defer logs.FlushLogs()
// 如果用户只是想看apiserver的版本号而不是启动apiserver,则打印apiserver的版本号并退出。
verflag.PrintAndExitIfRequested()
// 将创建的apiserver对象传入app.Run()中,最终绑定本地端口并绑定本地端口并创建一个HTTP Server与一个HTTPS Server。
if err := app.Run(s); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
}
新建一个apiserver对象---NewServerRunOptions():
func NewServerRunOptions() *ServerRunOptions {
s := ServerRunOptions{
// 初始化通用的apiserver运行参数,包括etcd后端存储参数
GenericServerRunOptions: genericoptions.NewServerRunOptions().WithEtcdOptions(),
// 事件的存储保留时间
EventTTL: 1 * time.Hour,
// Node上kubelet的客户端配置
KubeletConfig: kubeletclient.KubeletClientConfig{
// kubelet通信端口
Port: ports.KubeletPort,
PreferredAddressTypes: []string{
string(api.NodeHostName),
string(api.NodeInternalIP),
string(api.NodeExternalIP),
string(api.NodeLegacyHostIP),
},
// 是否开启https
EnableHttps: true,
// HTTP超时
HTTPTimeout: time.Duration(5) * time.Second,
},
// 将webhook token authenticator返回的响应保存在缓存内的时间
WebhookTokenAuthnCacheTTL: 2 * time.Minute,
}
return &s
}
上面的接口在初始化GenericServerRunOptions参数时又调用了genericoptions.NewServerRunOptions().WithEtcdOptions()
接口,先来看下与上面接口名字一样的NewServerRunOptions():
func NewServerRunOptions() *ServerRunOptions {
return &ServerRunOptions{
// 以逗号作为分隔符的Admission Control插件的排序列表
AdmissionControl: "AlwaysAdmit",
AnonymousAuth: false,
// 授权模式
AuthorizationMode: "AlwaysAllow",
AuthorizationWebhookCacheAuthorizedTTL: 5 * time.Minute,
AuthorizationWebhookCacheUnauthorizedTTL: 30 * time.Second,
// apiserver绑定的网卡地址
BindAddress: net.ParseIP("0.0.0.0"),
// 证书目录
CertDirectory: "/var/run/kubernetes",
// 默认的对象存储类型
DefaultStorageMediaType: "application/json",
DefaultStorageVersions: registered.AllPreferredGroupVersions(),
DeleteCollectionWorkers: 1,
EnableGarbageCollection: true,
EnableProfiling: true,
EnableContentionProfiling: false,
EnableWatchCache: true,
// HTTP绑定的IP地址
InsecureBindAddress: net.ParseIP("127.0.0.1"),
// 不安全端口(HTTP)
InsecurePort: 8080,
LongRunningRequestRE: DefaultLongRunningRequestRE,
// Kubernetes系统中Master的数量
MasterCount: 1,
MasterServiceNamespace: api.NamespaceDefault,
MaxRequestsInFlight: 400,
MinRequestTimeout: 1800,
// k8s运行时环境配置
RuntimeConfig: make(config.ConfigurationMap),
// 安全端口
SecurePort: 6443,
ServiceNodePortRange: DefaultServiceNodePortRange,
StorageVersions: registered.AllPreferredGroupVersions(),
}
}
可以看到初始化的时候会有SecurePort、InsecurePort,实际就是对应HTTP、HTTPS的绑定端口。
我们可以看到这里的控制还是很全面的,包括安全控制(CertDirectory, HTTPS默认启动)、权限控制(AdmissionControl,AuthorizationMode)、服务限流控制(MaxRequestsInFlight)等。
具体的参数前面介绍结构体时基本都有提到。
继续后端存储etcd的配置初始化WithEtcdOptions():
func (o *ServerRunOptions) WithEtcdOptions() *ServerRunOptions {
o.StorageConfig = storagebackend.Config{
// etcd的默认路径前缀:/registry
Prefix: DefaultEtcdPathPrefix,
// 反序列化cache,未设置的话,会根据apiServer的内存限制进行配置
DeserializationCacheSize: 0,
}
return o
}
到这里apiServer的运行参数初始化关键性步骤基本结束,至于后面的s.AddFlags(pflag.CommandLine)就是获取命令行的输入信息,然后进行重新覆盖,这里就不讲了。
可以根据kube-apiserver进程的命令行信息,把命令行传参和结构配置进行对应:
#/usr/bin/kube-apiserver --logtostderr=true --v=0 --etcd-servers=http://test-master:2379 --insecure-bind-address=0.0.0.0 --port=8080 --kubelet-port=10250 --allow-privileged=false --service-cluster-ip-range=10.254.0.0/16 --admission-control=NamespaceLifecycle,NamespaceExists,LimitRanger,SecurityContextDeny,ServiceAccount,ResourceQuota --service-account-key-file=/var/run/kubernetes/apiserver.key
初始化完成之后,最重要的任务就是启动实例了。
所有的操作都是在run函数中执行,app.run()接口实现在cmd/kube-apiserver/app/server.go。
在最后APIServer会启动HTTP/HTTPS服务。
// Run runs the specified APIServer. This should never exit.
func Run(completeOptions completedServerRunOptions, stopCh <-chan struct{}) error {
// To help debugging, immediately log version
klog.Infof("Version: %+v", version.Get())
// 创建调用链
server, err := CreateServerChain(completeOptions, stopCh)
if err != nil {
return err
}
// 进行一些准备工作, 注册一些hander,执行hook等
prepared, err := server.PrepareRun()
if err != nil {
return err
}
// 开始执行
return prepared.Run(stopCh)
}
执行NonBlockingRun k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go:351
func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}) error {
auditStopCh := make(chan struct{})
// 1. 判断是否要启动审计日志
if s.AuditBackend != nil {
if err := s.AuditBackend.Run(auditStopCh); err != nil {
return fmt.Errorf("failed to run the audit backend: %v", err)
}
}
// 2. 启动 https server
internalStopCh := make(chan struct{})
var stoppedCh <-chan struct{}
if s.SecureServingInfo != nil && s.Handler != nil {
var err error
stoppedCh, err = s.SecureServingInfo.Serve(s.Handler, s.ShutdownTimeout, internalStopCh)
if err != nil {
close(internalStopCh)
close(auditStopCh)
return err
}
}
go func() {
<-stopCh
close(s.readinessStopCh)
close(internalStopCh)
if stoppedCh != nil {
<-stoppedCh
}
s.HandlerChainWaitGroup.Wait()
close(auditStopCh)
}()
// 3. 执行 postStartHooks
s.RunPostStartHooks(stopCh)
// 4. 向 systemd 发送 ready 信号
if _, err := systemd.SdNotify(true, "READY=1\n"); err != nil {
klog.Errorf("Unable to send systemd daemon successful start message: %v\n", err)
}
return nil
}
pkg目录下
admission 提供了一些权限管理的控制器
apis 是由code-generator自动生成的一些api包装器。
audit 是授权信息提取模块。给其他服务组件用的。
authentication 是角色管理的组件
authorization 是认证模块
endpoinsts 处理请求的http/https端点。
features
registry 跟etcd打交道的
server apiserver的 web模块
storage 跟etcd打交道的
utils
plugin/pkg
CreateServerChain
首先是创建ServerChain
// CreateServerChain creates the apiservers connected via delegation.
func CreateServerChain(completedOptions completedServerRunOptions, stopCh <-chan struct{}) (*aggregatorapiserver.APIAggregator, error) {
// nodetunneler与node通信,proxy实现代理功能,转发请求给其他apiservice
// apiserver到cluster的通信可以通过三种方法
// apiserver到kubelet的endpoint,用于logs功能,exec功能,port-forward功能
// HTTP连接,即使可以用HTTPS也不做任何其他校验,并不安全
// ssh tunnel,不推荐使用
nodeTunneler, proxyTransport, err := CreateNodeDialer(completedOptions)
// 1. 为 kubeAPIServer 创建配置
kubeAPIServerConfig, insecureServingInfo, serviceResolver, pluginInitializer, admissionPostStartHook, err := CreateKubeAPIServerConfig(completedOptions, nodeTunneler, proxyTransport)
if err != nil {
return nil, err
}
// 2. 判断是否配置了 APIExtensionsServer,创建 apiExtensionsConfig
apiExtensionsConfig, err := createAPIExtensionsConfig(*kubeAPIServerConfig.GenericConfig, kubeAPIServerConfig.ExtraConfig.VersionedInformers, pluginInitializer, completedOptions.ServerRunOptions, completedOptions.MasterCount,vc
serviceResolver, webhook.NewDefaultAuthenticationInfoResolverWrapper(proxyTransport, kubeAPIServerConfig.GenericConfig.LoopbackClientConfig))
if err != nil {
return nil, err
}
// 3. 初始化 APIExtensionsServer, 通过一个空的delegate初始化
apiExtensionsServer, err := createAPIExtensionsServer(apiExtensionsConfig, genericapiserver.NewEmptyDelegate())
if err != nil {
return nil, err
}
// 4. 初始化 KubeAPIServer
kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, apiExtensionsServer.GenericAPIServer, admissionPostStartHook)
if err != nil {
return nil, err
}
// 5. 创建 AggregatorConfig
aggregatorConfig, err := createAggregatorConfig(*kubeAPIServerConfig.GenericConfig, completedOptions.ServerRunOptions, kubeAPIServerConfig. ExtraConfig.VersionedInformers, serviceResolver, proxyTransport, pluginInitializer)
if err != nil {
return nil, err
}
// 6. 初始化 AggregatorServer
aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, apiExtensionsServer.Informers)
if err != nil {
return nil, err
}
// 7. 判断是否启动非安全端口的 http server
if insecureServingInfo != nil {
insecureHandlerChain := kubeserver.BuildInsecureHandlerChain(aggregatorServer.GenericAPIServer.UnprotectedHandler(), kubeAPIServerConfig.GenericConfig)
if err := insecureServingInfo.Serve(insecureHandlerChain, kubeAPIServerConfig.GenericConfig.RequestTimeout, stopCh); err != nil {
return nil, err
}
}
return aggregatorServer, nil
}
这部分代码比较清晰,CreateNodeDialer是根据配置生成ssh tunneler,用以连接各个node的kubelet。另外就是创建了3个config和3个server:
config:
- controlplane.Config
- apiextensionsapiserver.Config
- aggregatorapiserver.Config
server:
- apiextensionsapiserver.CustomResourceDefinitions
- kubeAPIServer.kubeAPIServer
- aggregatorapiserver.APIAggregator
config和server的对应关系应当一目明了吧,这里是按照创建顺序书写的。
controlplane.Config的配置是相当的多,包含genericapiserver.Config和ExtraConfig, genericapiserver.Config的注释里面也说了,在代码里是按照重要级别排序的,前面几个分别是server配置、验证配置、授权配置。ExtraConfig主要包含的是apiserver额外的信息:集群认证信息、etcd配置等。
type Config struct {
GenericConfig *genericapiserver.Config
ExtraConfig ExtraConfig
}
这里主要方法是CreateKubeAPIServerConfig,该方法返回controlplane.Config:
创建 apiserver 通用配置
在 CreateServerChain 函数中,基本可以从函数名猜出来每一步在干什么,通用配置的创建就是在函数 CreateKubeAPIServerConfig() 中完成的。进入到以下函数中看一下详细实现:CreateKubeAPIServerConfig() --> buildGenericConfig()。
创建通用配置流程主要有以下几步:
1.GenericConfig 实例化
代码如下:
#buildGenericConfig
func buildGenericConfig(
...
#默认生成api文档
genericConfig.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig(generatedopenapi.GetOpenAPIDefinitions, openapinamer.NewDefinitionNamer(legacyscheme.Scheme, extensionsapiserver.Scheme, aggregatorscheme.Scheme))
...
#采用etcd作为存储方案
completedStorageFactoryConfig, err := storageFactoryConfig.Complete(s.Etcd)
#认证机制
if lastErr = s.Authentication.ApplyTo
#授权机制
genericConfig.Authorization.Authorizer, genericConfig.RuleResolver, err = BuildAuthorizer(s, genericConfig.EgressSelector, versionedInformers)
#准入机制
err = s.Admission.ApplyTo()
}
2.StorageFactoryConfig
apiserver 组件使用 etcd 作为集群的存储,系统中所使用的所有资源、集群状态、配置等都在这上面保存。代码部分如下:
// 初始化 storageFactoryConfig 配置对象。
storageFactoryConfig := kubeapiserver.NewStorageFactoryConfig()
storageFactoryConfig.APIResourceConfig = genericConfig.MergedResourceConfig
// 初始化 etcd 相关的配置信息,补全配置对象,返回 completedStorageFactoryConfig
completedStorageFactoryConfig, err := storageFactoryConfig.Complete(s.Etcd)
if err != nil {
lastErr = err
return
}
// 根据上面补完的配置信息,创建 storageFactory 对象
storageFactory, lastErr = completedStorageFactoryConfig.New()
if lastErr != nil {
return
}
3.Authorizer认证、授权配置
作为整个系统的存储对象交互入口,每个系统的请求都需要经过认证、授权、准入控制器这些阶段,准入控制器下面再说,涉及到的代码如下:
认证机制
s.Authentication.ApplyTo
func (o *BuiltInAuthenticationOptions) ApplyTo(authInfo *genericapiserver.AuthenticationInfo, secureServing *genericapiserver.SecureServingInfo, egressSelector *egressselector.EgressSelector, openAPIConfig *openapicommon.Config, extclient kubernetes.Interface, versionedInformer informers.SharedInformerFactory){
...
#实例化Config
authInfo.Authenticator, openAPIConfig.SecurityDefinitions, err = authenticatorConfig.New()
...
}
authenticatorConfig.New
func (config Config) New() (authenticator.Request, *spec.SecurityDefinitions, error) {
#重点关注authenticators 和 tokenAuthenticators两个变量
var authenticators []authenticator.Request
var tokenAuthenticators []authenticator.Token
#添加requestHeader认证方式
if config.RequestHeaderConfig != nil {
requestHeaderAuthenticator := headerrequest.NewDynamicVerifyOptionsSecure(
...
)
#追加认证方式
authenticators = append(authenticators, authenticator.WrapAudienceAgnosticRequest(config.APIAudiences, requestHeaderAuthenticator))
}
#添加CLientCA认证方式
if config.ClientCAContentProvider != nil {
...
}
#添加Token认证方式
if len(config.TokenAuthFile) > 0 {
tokenAuth, err := newAuthenticatorFromTokenFile(config.TokenAuthFile)
}
#...其他各种认证方式
#如果没有认证方式 则启动anonymous
if len(authenticators) == 0 {
if config.Anonymous {
return anonymous.NewAuthenticator(), &securityDefinitions, nil
}
return nil, &securityDefinitions, nil
}
#整合两种认证方式authenticators 和 tokenAuthenticators
authenticator := union.New(authenticators...)
}
return authenticator, &securityDefinitions, nil
}
``
授权机制
genericConfig.Authorization.Authorizer, genericConfig.RuleResolver, err = BuildAuthorizer(s, genericConfig.EgressSelector, versionedInformers)
调用此函数 authorizationConfig.New()
New
func (config Config) New() (authorizer.Authorizer, authorizer.RuleResolver, error) {
...
case modes.ModeNode:
...
const (
...
#对生产来说最有用出的模式RBAC 角色-用户模式
ModeRBAC string = "RBAC"
...
)
New 函数主要就是在 for 循环中根据 config.AuthorizationModes 配置了 authorizers 和 ruleResolvers 两个变量,这个 config.AuthorizationModes 是在最初在初始化配置对象执行 options.NewServerRunOptions() 的时候赋值的,具体路径及代码如下:
k8s.io/kubernetes/pkg/kubeapiserver/options/authorization.go
func NewBuiltInAuthorizationOptions() *BuiltInAuthorizationOptions {
return &BuiltInAuthorizationOptions{
// 初始赋值就一个 "AlwaysAllow" 字符串,这是默认的配置。
Modes: []string{authzmodes.ModeAlwaysAllow},
WebhookVersion: "v1beta1",
WebhookCacheAuthorizedTTL: 5 * time.Minute,
WebhookCacheUnauthorizedTTL: 30 * time.Second,
WebhookRetryBackoff: genericoptions.DefaultAuthWebhookRetryBackoff(),
}
}
在回到New()中,在函数最后返回了认证和授权对象。
return union.New(authorizers...), union.NewRuleResolvers(ruleResolvers...), nil
authorizers 是已启用的认证器列表,union.New将它合并成一个认证器。
ruleResolvers 是已启用的规则解析器,union.NewRuleResolvers 也是合并了一下
可以看到默认的授权是 AlwaysAllow,具体其它类型可以在启动的时候在配置里面设置,只要配置了,就会实例化该授权对象,认证的时候会遍历每一个授权器,有一个认证成功就ok。
CreateKubeAPIServerConfig该方法创建了apiserver所需要的所有资源,并将ServerRunOptions里面的参数转化为genericapiserver.Config和ExtraConfig。
创建APIExtensionConfig
创建服务链的第二步是创建ApiExtensionConfig, ApiExtensionConfig与ApiServerConfig几乎一致,在ExtraConfig上是不同的, ApiExtensionConfig的ExtraConfig只有几个字段,少很多。创建ApiExtensionConfig的方法是createAPIExtensionsConfig, 该方法主要做了四件事:
- 将ApiServerConfig中的GenericConfig进行了浅拷贝。
- 重写GenericConfig的AdmissionControl,因为ApiExtentionServer需要用自己的scheme。
- 拷贝了etcd的配置,避免相互影响。
- 重写MergedResourceConfig。
创建APIExtensionServer
当创建好APIExtensionConfig之后,就是创建APIExtensionServer,调用方法如下:
apiExtensionsServer, err := createAPIExtensionsServer(apiExtensionsConfig, genericapiserver.NewEmptyDelegate())
作为request 处理链的最后一环,这里传了一个空的DelegateTarget,和后面创建KubeApiServer不一样,创建KubeApiServer传入了APIExtensionServer的GenericAPIServer。回到createAPIExtensionsServer方法,最终会调用到apiextension-apiserver/apiserver#New()
func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*CustomResourceDefinitions, error) {
genericServer, err := c.GenericConfig.New("apiextensions-apiserver", delegationTarget)
if err != nil {
return nil, err
}
s := &CustomResourceDefinitions{
GenericAPIServer: genericServer,
}
... ...
if err := s.GenericAPIServer.InstallAPIGroup(&apiGroupInfo); err != nil {
return nil, err
}
... ...
delegateHandler := delegationTarget.UnprotectedHandler()
if delegateHandler == nil {
delegateHandler = http.NotFoundHandler()
}
versionDiscoveryHandler := &versionDiscoveryHandler{
discovery: map[schema.GroupVersion]*discovery.APIVersionHandler{},
delegate: delegateHandler,
}
groupDiscoveryHandler := &groupDiscoveryHandler{
discovery: map[string]*discovery.APIGroupHandler{},
delegate: delegateHandler,
}
establishingController := establish.NewEstablishingController(s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions(), crdClient.Apiextensions())
crdHandler, err := NewCustomResourceDefinitionHandler(
versionDiscoveryHandler,
groupDiscoveryHandler,
s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions(),
delegateHandler,
c.ExtraConfig.CRDRESTOptionsGetter,
c.GenericConfig.AdmissionControl,
establishingController,
c.ExtraConfig.ServiceResolver,
c.ExtraConfig.AuthResolverWrapper,
c.ExtraConfig.MasterCount,
s.GenericAPIServer.Authorizer,
c.GenericConfig.RequestTimeout,
time.Duration(c.GenericConfig.MinRequestTimeout)*time.Second,
apiGroupInfo.StaticOpenAPISpec,
c.GenericConfig.MaxRequestBodyBytes,
)
if err != nil {
return nil, err
}
s.GenericAPIServer.Handler.NonGoRestfulMux.Handle("/apis", crdHandler)
s.GenericAPIServer.Handler.NonGoRestfulMux.HandlePrefix("/apis/", crdHandler)
crdController := NewDiscoveryController(s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions(), versionDiscoveryHandler, groupDiscoveryHandler)
namingController := status.NewNamingConditionController(s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions(), crdClient.Apiextensions())
nonStructuralSchemaController := nonstructuralschema.NewConditionController(s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions(), crdClient.Apiextensions())
apiApprovalController := apiapproval.NewKubernetesAPIApprovalPolicyConformantConditionController(s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions(), crdClient.Apiextensions())
finalizingController := finalizer.NewCRDFinalizer(
s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions(),
crdClient.Apiextensions(),
crdHandler,
)
var openapiController *openapicontroller.Controller
if utilfeature.DefaultFeatureGate.Enabled(apiextensionsfeatures.CustomResourcePublishOpenAPI) {
openapiController = openapicontroller.NewController(s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions())
}
s.GenericAPIServer.AddPostStartHookOrDie("start-apiextensions-informers", func(context genericapiserver.PostStartHookContext) error {
s.Informers.Start(context.StopCh)
return nil
})
s.GenericAPIServer.AddPostStartHookOrDie("start-apiextensions-controllers", func(context genericapiserver.PostStartHookContext) error {
// OpenAPIVersionedService and StaticOpenAPISpec are populated in generic apiserver PrepareRun().
// Together they serve the /openapi/v2 endpoint on a generic apiserver. A generic apiserver may
// choose to not enable OpenAPI by having null openAPIConfig, and thus OpenAPIVersionedService
// and StaticOpenAPISpec are both null. In that case we don't run the CRD OpenAPI controller.
if utilfeature.DefaultFeatureGate.Enabled(apiextensionsfeatures.CustomResourcePublishOpenAPI) && s.GenericAPIServer.OpenAPIVersionedService != nil && s.GenericAPIServer.StaticOpenAPISpec != nil {
go openapiController.Run(s.GenericAPIServer.StaticOpenAPISpec, s.GenericAPIServer.OpenAPIVersionedService, context.StopCh)
}
go crdController.Run(context.StopCh)
go namingController.Run(context.StopCh)
go establishingController.Run(context.StopCh)
go nonStructuralSchemaController.Run(5, context.StopCh)
go apiApprovalController.Run(5, context.StopCh)
go finalizingController.Run(5, context.StopCh)
return nil
})
// we don't want to report healthy until we can handle all CRDs that have already been registered. Waiting for the informer
// to sync makes sure that the lister will be valid before we begin. There may still be races for CRDs added after startup,
// but we won't go healthy until we can handle the ones already present.
s.GenericAPIServer.AddPostStartHookOrDie("crd-informer-synced", func(context genericapiserver.PostStartHookContext) error {
return wait.PollImmediateUntil(100*time.Millisecond, func() (bool, error) {
return s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions().Informer().HasSynced(), nil
}, context.StopCh)
})
return s, nil
}
apiextensions-apiserver/apiserver.go这个文件几乎只写了这个New方法。
- New方法的首先是通过GenericConfig New了一个GenericServer,该Server将GenericConfig中的各个配置转化为自己的属性,同时New了APIServerHandler, APIServerHandler包含APIServer使用的各种http Handler: FullHandlerChain、GoRestfulContainer、NonGoRestfulMux、Director。
其次是暴露API Group。APIServer中是有各种不同的API Group, 比如networking.k8s.io/v1beta1、apps/v1beta1、metrics.k8s.io/v1beta1等。ExtensionApiServer主要是一些自定义的API Group。 - NewCustomResourceDefinitionHandler 为上面GenericServer中的Handlers赋值。处理后续的API请求。
实例化各种Controller,包括:NewDiscoveryController、NewNamingConditionController、NewConditionController、NewKubernetesAPIApprovalPolicyConformantConditionController、NewCRDFinalizer、openapicontroller。每个controller完成各自对应的业务逻辑。 - 添加PostStart Hook, 当APIServer启动之后,异步协程调用各个Controller的Run方法。
APIExtensionsServer最先初始化,在调用链的末尾, 处理CR、CRD相关资源.
其中包含的 controller 以及功能如下所示:
- openapiController:将 crd 资源的变化同步至提供的 OpenAPI 文档,可通过访问 /openapi/v2 进行查看;
- crdController:负责将 crd 信息注册到 apiVersions 和 apiResources 中,两者的信息可通过 kubectl api-resources 查看;
- namingController:检查 crd obj 中是否有命名冲突,可在 crd .status.conditions 中查看;
- establishingController:检查 crd 是否处于正常状态,可在 crd .status.conditions 中查看;
- nonStructuralSchemaController:检查 crd obj 结构是否正常,可在 crd .status.conditions 中查看;
- apiApprovalController:检查 crd 是否遵循 kubernetes API 声明策略,可在 crd .status.conditions 中查看;
- finalizingController:类似于 finalizes 的功能,与 CRs 的删除有关;
c.GenericConfig.New来初始化genericapiserver,包裹一些默认链,创建handler
func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*GenericAPIServer, error) {
if c.Serializer == nil {
return nil, fmt.Errorf("Genericapiserver.New() called with config.Serializer == nil")
}
if c.LoopbackClientConfig == nil {
return nil, fmt.Errorf("Genericapiserver.New() called with config.LoopbackClientConfig == nil")
}
if c.EquivalentResourceRegistry == nil {
return nil, fmt.Errorf("Genericapiserver.New() called with config.EquivalentResourceRegistry == nil")
}
// 包裹了DefaultBuildHandlerChain
handlerChainBuilder := func(handler http.Handler) http.Handler {
return c.BuildHandlerChainFunc(handler, c.Config)
}
// 创建apiserverhandler
apiServerHandler := NewAPIServerHandler(name, c.Serializer, handlerChainBuilder, delegationTarget.UnprotectedHandler())
...
return s, nil
}
APIServerHandler包含多种http.Handler类型,包括go-restful以及non-go-restful,以及在以上两者之间选择的Director对象,go-restful用于处理已经注册的handler,non-go-restful用来处理不存在的handler,API URI处理的选择过程为:FullHandlerChain-> Director ->{GoRestfulContainer, NonGoRestfulMux}。NewAPIServerHandler`
unc NewAPIServerHandler(name string, s runtime.NegotiatedSerializer, handlerChainBuilder HandlerChainBuilderFn, notFoundHandler http.Handler) *APIServerHandler {
// non-go-restful路由
nonGoRestfulMux := mux.NewPathRecorderMux(name)
if notFoundHandler != nil {
nonGoRestfulMux.NotFoundHandler(notFoundHandler)
}
// go-resetful路由
gorestfulContainer := restful.NewContainer()
gorestfulContainer.ServeMux = http.NewServeMux()
gorestfulContainer.Router(restful.CurlyRouter{}) // e.g. for proxy/{kind}/{name}/{*}
gorestfulContainer.RecoverHandler(func(panicReason interface{}, httpWriter http.ResponseWriter) {
logStackOnRecover(s, panicReason, httpWriter)
})
gorestfulContainer.ServiceErrorHandler(func(serviceErr restful.ServiceError, request *restful.Request, response *restful.Response) {
serviceErrorHandler(s, serviceErr, request, response)
})
// 选择器, 根据path选择是否执行go-restful,注册过的path执行go-restful
director := director{
name: name,
goRestfulContainer: gorestfulContainer,
nonGoRestfulMux: nonGoRestfulMux,
}
return &APIServerHandler{
FullHandlerChain: handlerChainBuilder(director),
GoRestfulContainer: gorestfulContainer,
NonGoRestfulMux: nonGoRestfulMux,
Director: director,
}
}
以上是APIExtensionsServer的初始化流程,初始化Server, 调用s.GenericAPIServer.InstallAPIGroup注册api。此方法的调用链非常深,主要是为了将需要暴露的API Resource注册到 server 中,以便能通过 http 接口进行 resource 的 REST 操作,其他几种 server 在初始化时也都会执行对应的 InstallAPI方法