1. 前言
转载请说明原文出处, 尊重他人劳动成果!
源码位置: https://github.com/nicktming/kubernetes/tree/tming-v1.13/pkg/controller/replicaset
分支: tming-v1.13 (基于v1.13版本)
本文将分析
controller-manager
的启动过程.
2. 启动
在
kubernetes/cmd/kube-controller-manager/controller-manager.go
中启动.
// kubernetes/cmd/kube-controller-manager/controller-manager.go
func main() {
rand.Seed(time.Now().UnixNano())
command := app.NewControllerManagerCommand()
...
if err := command.Execute(); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
}
进入到
func NewControllerManagerCommand() *cobra.Command {
s, err := options.NewKubeControllerManagerOptions()
...
cmd := &cobra.Command{
Use: "kube-controller-manager",
...
Run: func(cmd *cobra.Command, args []string) {
...
c, err := s.Config(KnownControllers(), ControllersDisabledByDefault.List())
...
if err := Run(c.Complete(), wait.NeverStop); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
},
}
...
return cmd
}
这里需要注意三个地方:
1.s, err := options.NewKubeControllerManagerOptions()
创建一个默认的KubeControllerManagerOptions
对象s
.
2.c, err := s.Config(KnownControllers(), ControllersDisabledByDefault.List())
通过传入的参数将s
变成一个Config
对象.
3.Run(c.Complete(), wait.NeverStop)
启动程序.
接下来将总这三个部分来继续分析.
2.1 创建KubeControllerManagerOptions对象
// cmd/kube-controller-manager/app/options/options.go
func NewKubeControllerManagerOptions() (*KubeControllerManagerOptions, error) {
componentConfig, err := NewDefaultComponentConfig(ports.InsecureKubeControllerManagerPort)
if err != nil {
return nil, err
}
...
}
这里主要根据系统的一些默认值生成了一个
KubeControllerManagerOptions
对象.
2.2 生成Config对象
===> kubernetes/cmd/kube-controller-manager/app/controllermanager.go
func KnownControllers() []string {
ret := sets.StringKeySet(NewControllerInitializers(IncludeCloudLoops))
ret.Insert(
saTokenControllerName,
)
return ret.List()
}
func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc {
controllers := map[string]InitFunc{}
controllers["endpoint"] = startEndpointController
...
controllers["deployment"] = startDeploymentController
...
controllers["nodeipam"] = startNodeIpamController
if loopMode == IncludeCloudLoops {
controllers["service"] = startServiceController
controllers["route"] = startRouteController
}
...
controllers["root-ca-cert-publisher"] = startRootCACertPublisher
return controllers
}
var ControllersDisabledByDefault = sets.NewString(
"bootstrapsigner",
"tokencleaner",
)
可以看到
KnownControllers
是所有需要启动的controller
的名字, 对应的方法就是启动其对应controller
的实体
func (s KubeControllerManagerOptions) Config(allControllers []string, disabledByDefaultControllers []string) (*kubecontrollerconfig.Config, error) {
if err := s.Validate(allControllers, disabledByDefaultControllers); err != nil {
return nil, err
}
if err := s.SecureServing.MaybeDefaultWithSelfSignedCerts("localhost", nil, []net.IP{net.ParseIP("127.0.0.1")}); err != nil {
return nil, fmt.Errorf("error creating self-signed certificates: %v", err)
}
kubeconfig, err := clientcmd.BuildConfigFromFlags(s.Master, s.Kubeconfig)
if err != nil {
return nil, err
}
kubeconfig.ContentConfig.ContentType = s.Generic.ClientConnection.ContentType
kubeconfig.QPS = s.Generic.ClientConnection.QPS
kubeconfig.Burst = int(s.Generic.ClientConnection.Burst)
// 生成一个与api-server打交道的client
client, err := clientset.NewForConfig(restclient.AddUserAgent(kubeconfig, KubeControllerManagerUserAgent))
if err != nil {
return nil, err
}
// shallow copy, do not modify the kubeconfig.Timeout.
config := *kubeconfig
config.Timeout = s.Generic.LeaderElection.RenewDeadline.Duration
leaderElectionClient := clientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "leader-election"))
eventRecorder := createRecorder(client, KubeControllerManagerUserAgent)
c := &kubecontrollerconfig.Config{
Client: client,
Kubeconfig: kubeconfig,
EventRecorder: eventRecorder,
LeaderElectionClient: leaderElectionClient,
}
if err := s.ApplyTo(c); err != nil {
return nil, err
}
return c, nil
}
可以看到
Config
中生成四个变量:client 用于与
api-server
进行交流.
kubeconfig 配置
eventRecorder 记录
LeaderElectionClient 高可用的时候用到
2.3 Run
===>cmd/kube-controller-manager/app/controllermanager.go
func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error {
...
run := func(ctx context.Context) {
...
if c.ComponentConfig.KubeCloudShared.UseServiceAccountCredentials {
if len(c.ComponentConfig.SAController.ServiceAccountKeyFile) == 0 {
// 这个就是在启动的时候需要指定--service-account-private-key-file
klog.Warningf("--use-service-account-credentials was specified without providing a --service-account-private-key-file")
}
clientBuilder = controller.SAControllerClientBuilder{
ClientConfig: restclient.AnonymousClientConfig(c.Kubeconfig),
CoreClient: c.Client.CoreV1(),
AuthenticationClient: c.Client.AuthenticationV1(),
Namespace: "kube-system",
}
} else {
clientBuilder = rootClientBuilder
}
controllerContext, err := CreateControllerContext(c, rootClientBuilder, clientBuilder, ctx.Done())
if err != nil {
klog.Fatalf("error building controller context: %v", err)
}
saTokenControllerInitFunc := serviceAccountTokenControllerStarter{rootClientBuilder: rootClientBuilder}.startServiceAccountTokenController
// 启动所有controller
if err := StartControllers(controllerContext, saTokenControllerInitFunc, NewControllerInitializers(controllerContext.LoopMode), unsecuredMux); err != nil {
klog.Fatalf("error starting controllers: %v", err)
}
// 启动所有注册的infromers
controllerContext.InformerFactory.Start(controllerContext.Stop)
close(controllerContext.InformersStarted)
select {}
}
if !c.ComponentConfig.Generic.LeaderElection.LeaderElect {
run(context.TODO())
panic("unreachable")
}
id, err := os.Hostname()
if err != nil {
return err
}
// add a uniquifier so that two processes on the same host don't accidentally both become active
id = id + "_" + string(uuid.NewUUID())
rl, err := resourcelock.New(c.ComponentConfig.Generic.LeaderElection.ResourceLock,
"kube-system",
"kube-controller-manager",
c.LeaderElectionClient.CoreV1(),
resourcelock.ResourceLockConfig{
Identity: id,
EventRecorder: c.EventRecorder,
})
if err != nil {
klog.Fatalf("error creating lock: %v", err)
}
// 高可用
leaderelection.RunOrDie(context.TODO(), leaderelection.LeaderElectionConfig{
Lock: rl,
LeaseDuration: c.ComponentConfig.Generic.LeaderElection.LeaseDuration.Duration,
RenewDeadline: c.ComponentConfig.Generic.LeaderElection.RenewDeadline.Duration,
RetryPeriod: c.ComponentConfig.Generic.LeaderElection.RetryPeriod.Duration,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: run,
OnStoppedLeading: func() {
klog.Fatalf("leaderelection lost")
},
},
WatchDog: electionChecker,
Name: "kube-controller-manager",
})
panic("unreachable")
}
这里需要注意几点:
1. 这个就是在启动的时候需要指定
--service-account-private-key-file
文件, 在 k8s源码编译以及二进制安装(用于源码开发调试版) 中已经遇到过了, 该问题会在别的文章分析.
2.controllerContext
属性中有InformerFactory
, 并且所有的controller
用的都是这个InformerFactory
, 因此所有用到同一个informer
的controller
里面都是一样的, 比如所有用到podInformer
的controller
用的都是同一个对象.
3.StartControllers
方法中启动所有的controller
, 每个controller
都是以goroutine
的方式启动.
===>cmd/kube-controller-manager/app/controllermanager.go
func StartControllers(ctx ControllerContext, startSATokenController InitFunc, controllers map[string]InitFunc, unsecuredMux *mux.PathRecorderMux) error {
...
for controllerName, initFn := range controllers {
if !ctx.IsControllerEnabled(controllerName) {
klog.Warningf("%q is disabled", controllerName)
continue
}
...
// 都是以goroutine的方式启动
debugHandler, started, err := initFn(ctx)
...
}
return nil
}
- 高可用, 使用的其实也是
endpoint
资源类型, 这里不多说了, 可以参考 [k8s源码分析][client-go] k8s选举leaderelection (分布式资源锁实现) 和 [k8s源码分析][kube-scheduler]scheduler之高可用及原理 .
[root@master kubectl]# ./kubectl get endpoints -n kube-system
NAME ENDPOINTS AGE
kube-controller-manager <none> 22d
kube-scheduler <none> 22d
[root@master kubectl]# ./kubectl get endpoints kube-controller-manager -o yaml -n kube-system
apiVersion: v1
kind: Endpoints
metadata:
annotations:
control-plane.alpha.kubernetes.io/leader: '{"holderIdentity":"master_6efbea00-0168-11ea-b452-525400d54f7e","leaseDurationSeconds":15,"acquireTime":"2019-11-07T14:11:21Z","renewTime":"2019-11-07T14:12:01Z","leaderTransitions":15}'
creationTimestamp: "2019-10-15T14:57:16Z"
name: kube-controller-manager
namespace: kube-system
resourceVersion: "131420"
selfLink: /api/v1/namespaces/kube-system/endpoints/kube-controller-manager
uid: 13f00d33-ef5c-11e9-af01-525400d54f7e
[root@master kubectl]#