K8s所有的模块的入口函数都位于kubernetes/cmd/ 目录下。
kube-apiserver下的包结构
main包下是apiserver
app目录下是app包,子目录options下是options包。
入口main 函数
- main函数
//设置随之数生成器的seed值
rand.Seed(time.Now().UTC().UnixNano())
//生成一个ServerRunOptions实例
s := options.NewServerRunOptions()
s.AddFlags(pflag.CommandLine)
flag.InitFlags()
logs.InitLogs()
defer logs.FlushLogs()
//初始化log模块
verflag.PrintAndExitIfRequested()
//Run函数会调用CreateKubeAPIServer 创建一个kubeAPIServer结构体实例
//NeverStop is a <-chan struct{}
if err := app.Run(s, wait.NeverStop); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
}
main 函数中最前面一堆都是初始化,真正的入口是app.Run
func Run(runOptions *options.ServerRunOptions, stopCh <-chan struct{}) error {}
上面是Run函数在app package中的声明。第一个参数是一个ServerRunOptions结构的指针。第二个参数是一个只读的struct chanel。
- ServerRunOptions结构体解读
type ServerRunOptions struct {
GenericServerRunOptions *genericoptions.ServerRunOptions
Etcd *genericoptions.EtcdOptions
SecureServing *genericoptions.SecureServingOptions
InsecureServing *kubeoptions.InsecureServingOptions
Audit *genericoptions.AuditOptions
Features *genericoptions.FeatureOptions
Admission *genericoptions.AdmissionOptions
Authentication *kubeoptions.BuiltInAuthenticationOptions
Authorization *kubeoptions.BuiltInAuthorizationOptions
CloudProvider *kubeoptions.CloudProviderOptions
StorageSerialization *kubeoptions.StorageSerializationOptions
APIEnablement *kubeoptions.APIEnablementOptions
AllowPrivileged bool
EnableLogsHandler bool
EventTTL time.Duration
KubeletConfig kubeletclient.KubeletClientConfig
KubernetesServiceNodePort int
MasterCount int
MaxConnectionBytesPerSec int64
ServiceClusterIPRange net.IPNet // TODO: make this a list
ServiceNodePortRange utilnet.PortRange
SSHKeyfile string
SSHUser string
ProxyClientCertFile string
ProxyClientKeyFile string
EnableAggregatorRouting bool
} //ServerRunOptions 的结构定义.
genericoptiions是options.go 中import的vendor下的option包的别名
genericoptions.ServerRunOptions
type ServerRunOptions struct {
AdvertiseAddress net.IP
CorsAllowedOriginList []string
ExternalHost string
MaxRequestsInFlight int
MaxMutatingRequestsInFlight int
MinRequestTimeout int
TargetRAMMB int
WatchCacheSizes []string
}
AdvertiseAddress apiserver 监听的ip地址默认为hostip
CoreAllowedOriginlist 允许访问的域可以使用正则限定
...
具体的各个参数的含义可以通过apiserver --help 查看
etcd etcd集群相关的参数
SecureServing 安全监听的相关参数默认初始化为0.0.0.0:6443
InsecureServing 默认127.0.0.1:8080
...
- wait 中定义的包级变量NerverStop
var NeverStop <-chan struct{} = make(chan struct{})
在main函数中通过NewServerRunOptions使用默认参数初始化一个ServerRunOptions实例的指针并传递给appPackage的Run函数
app.Run
func Run(runOptions *options.ServerRunOptions, stopCh <-chan struct{}) error {
//
// To help debugging, immediately log version
glog.Infof("Version: %+v", version.Get())
nodeTunneler, proxyTransport, err := CreateNodeDialer(runOptions)
if err != nil {
return err
}
kubeAPIServerConfig, sharedInformers, versionedInformers, insecureServingOptions, serviceResolver, err := CreateKubeAPIServerConfig(runOptions, nodeTunneler, proxyTransport)
if err != nil {
return err
}
// TPRs are enabled and not yet beta, since this these are the successor, they fall under the same enablement rule
// If additional API servers are added, they should be gated.
apiExtensionsConfig, err := createAPIExtensionsConfig(*kubeAPIServerConfig.GenericConfig, runOptions)
if err != nil {
return err
}
apiExtensionsServer, err := createAPIExtensionsServer(apiExtensionsConfig, genericapiserver.EmptyDelegate)
if err != nil {
return err
}
kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, apiExtensionsServer.GenericAPIServer, sharedInformers, apiExtensionsConfig.CRDRESTOptionsGetter)
if err != nil {
return err
}
// if we're starting up a hacked up version of this API server for a weird test case,
// just start the API server as is because clients don't get built correctly when you do this
if len(os.Getenv("KUBE_API_VERSIONS")) > 0 {
if insecureServingOptions != nil {
insecureHandlerChain := kubeserver.BuildInsecureHandlerChain(kubeAPIServer.GenericAPIServer.UnprotectedHandler(), kubeAPIServerConfig.GenericConfig)
if err := kubeserver.NonBlockingRun(insecureServingOptions, insecureHandlerChain, stopCh); err != nil {
return err
}
}
return kubeAPIServer.GenericAPIServer.PrepareRun().Run(stopCh)
}
kubeAPIServer.GenericAPIServer.PrepareRun()
// aggregator comes last in the chain
aggregatorConfig, err := createAggregatorConfig(*kubeAPIServerConfig.GenericConfig, runOptions, versionedInformers, serviceResolver, proxyTransport)
if err != nil {
return err
}
aggregatorConfig.ProxyTransport = proxyTransport
aggregatorConfig.ServiceResolver = serviceResolver
aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, sharedInformers, apiExtensionsServer.Informers)
if err != nil {
// we don't need special handling for innerStopCh because the aggregator server doesn't create any go routines
return err
}
if insecureServingOptions != nil {
insecureHandlerChain := kubeserver.BuildInsecureHandlerChain(aggregatorServer.GenericAPIServer.UnprotectedHandler(), kubeAPIServerConfig.GenericConfig)
if err := kubeserver.NonBlockingRun(insecureServingOptions, insecureHandlerChain, stopCh); err != nil {
return err
}
}
return aggregatorServer.GenericAPIServer.PrepareRun().Run(stopCh)
app.run 函数通过一系列的调用最终启用一个sync loop 。
在经过一系列初始化后先获取环境变量KUBE_API_VERSIONS。如果设置了这个环境变量切insecureSeveringOptions不为nil。
- NonBlockingRun函数
kubeserver.NonBlockingRun(insecureServingOptions, insecureHandlerChain, stopCh)
然后NonBlockingRun又调用了同一个文件中的 serveInsecurely(insecureServingInfo, insecureHandler, internalStopCh)函数,这里的insecureServingInfo和insecureHandle2个参数就是NonBlockingRun的前2参数,internalStopCh是在NonBlockingRun中自己定义的一个struct{}channel,传递给insecureServingInfo只读channel。
然后NonBlockingRun又启动一个goroutine.
go func() {
<-stopCh
close(internalStopCh)
}()
这里的stopCh是wait包里声明的那个channel,由Run函数传入。关闭的是NonBlockingRun函数传递给下面的函数的channel。
- serveInsecurely 函数
使用insecureServingInfo.BindAddress和insecureHandler实例化一个http.server结构。然后调用server.RunServer(insecureServer, insecureServingInfo.BindNetwork, stopCh) RunServer函数位于k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/server/serve.go文件中注意这里的server.go 文件和cmd下的server.go 文件不是同一个。stopCh是在NonBlockingRun中定义的chan struct{}不是run函数传递进来那个。 - RunServer 函数根据传递来的http.server 结构中的Addr和network调用net.Listen 监听。如果network为空设置为tcp,Addr空报错退出。在RunServer中还定义了2个goroutine
go func() {
<-stopCh
ln.Close()
}()
上面这个go routine中如果stopCh解除阻塞后就关闭建立的tcp链接。注意这里的stopCh 是 NonBlockingRun make的 channel。RunServer 启动2个goroutine后直接返回serveInsecurely函数再返回NonBlockingRun函数,在NonBlockingRun函数中如果返回错误会close internalStopCh这channel,这样就接触了stopCh这个channel的阻塞。
go func() {
defer utilruntime.HandleCrash()
var listener net.Listener
listener = tcpKeepAliveListener{ln.(*net.TCPListener)}
if server.TLSConfig != nil {
listener = tls.NewListener(listener, server.TLSConfig)
}
err := server.Serve(listener)
msg := fmt.Sprintf("Stopped listening on %s", tcpAddr.String())
select {
case <-stopCh:
glog.Info(msg)
default:
panic(fmt.Sprintf("%s due to error: %v", msg, err))
}
}()
这个goroutine利用刚RunServer的返回的Listener,生成一个tcpKeepAliveListener,然后再利用net/http 下的server.Server 建立一个https监听。跟上一个goroutine一样,这个goroutine在返回到NonBlockingRun中然后close channel后就接触阻塞。并打印信息。