Apiserver 的 启动流程 -1

kube-apiserver进程的入口类源码位置如下:\kubernetes-master\cmd\kube-apiserver\apiserver.go

Apiserver通过Run方法启动, 主要逻辑为:

  • 调用CreateServerChain构建服务调用链并判断是否启动非安全的httpserver,httpserver链中包含 apiserver要启动的三个server,以及为每个server注册对应资源的路由;
  • 调用server.PrepareRun进行服务运行前的准备,该方法主要完成了健康检查. 存活检查和OpenAPI路由的注册工作;
  • 调用prepared.Run启动server;
    apiserver是K8S最重要的组成部分,不论是命令操作还是通过remote API进行控制,实际都需要经过apiserver。
    apiserver是k8s系统中所有对象的增删改查盯的http/restful式服务端,其中盯是指watch操作。数据最终存储在分布式一致的etcd存储内,apiserver本身是无状态的,提供了这些数据访问的认证鉴权、缓存、api版本适配转换等一系列的功能。
Run(runOptions *options.ServerRunOptions, stopCh <-chan struct{})
  |
  |————1) CreateServerChain(runOptions *options.ServerRunOptions, stopCh <-chan struct{}) (*genericapiserver.GenericAPIServer, error)
  |                 |  
  |                 |————1) CreateKubeAPIServerConfig(*options.ServerRunOptions, tunneler.Tunneler, *http.Transport) (*master.Config, informers.SharedInformerFactory, clientgoinformers.SharedInformerFactory, *kubeserver.InsecureServingInfo, aggregatorapiserver.ServiceResolver, error)
  |                 |           |  
  |                 |           |——————1) defaultOptions(s *options.ServerRunOptions)  
  |                 |           |——————2) Validate() []error  
  |                 |           |——————3) BuildGenericConfig(s *options.ServerRunOptions, *http.Transport) (*genericapiserver.Config, informers.SharedInformerFactory, clientgoinformers.SharedInformerFactory, *kubeserver.InsecureServingInfo, aggregatorapiserver.ServiceResolver, error)
  |                 |           |               |
  |                 |           |               |————1) NewConfig(codecs serializer.CodecFactory) *Config :包含DefaultBuildHandlerChain
  |                 |           |               |————2) genericapiserver.DefaultOpenAPIConfig(generatedopenapi.GetOpenAPIDefinitions, legacyscheme.Scheme)
  |                 |           |               |————3) genericapiserver.DefaultSwaggerConfig()
  |                 |           |               |————4) BuildStorageFactory(s *options.ServerRunOptions) (*serverstorage.DefaultStorageFactory, error)
  |                 |           |               |————5) s.Etcd.ApplyWithStorageFactoryTo(storageFactory, genericConfig)
  |                 |           |               |————6) BuildAuthenticator
  |                 |           |               |————7) BuildAuthorizer
  |                 |           |               |————8) BuildAdmissionPluginInitializers
  |                 |           |
  |                 |           |——————4) &master.Config=genericapiserver.Config+ExtraConfig
  |                 |                         
  |                 |————2) createAPIExtensionsConfig(genericapiserver.Config, kubeexternalinformers.SharedInformerFactory, *options.ServerRunOptions) (*apiextensionsapiserver.Config, error):设置RESTOptionsGetter
  |                 |————3) createAPIExtensionsServer( *apiextensionsapiserver.Config,  genericapiserver.DelegationTarget) (*apiextensionsapiserver.CustomResourceDefinitions, error)
  |                 |               |
  |                 |               |————1) apiextensionsConfig.Complete():CompletedConfig
  |                 |               |           |————(c *RecommendedConfig) Complete() CompletedConfig :cfg.GenericConfig.Complete()
  |                 |               |                          |————(c *Config) Complete(informers informers.SharedInformerFactory) CompletedConfig: c.Config.Complete(c.SharedInformerFactory)
  |                 |               |————2) (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget):*CustomResourceDefinitions
  |                 |                           |————1) c.GenericConfig.New("apiextensions-apiserver", delegationTarget):*GenericAPIServer
  |                 |                           |            |————1) NewAPIServerHandler(name string, request.RequestContextMapper, runtime.NegotiatedSerializer, HandlerChainBuilderFn, notFoundHandler http.Handler) *APIServerHandler
  |                 |                           |            |           |————1) nonGoRestfulMux := genericmux.NewPathRecorderMux(name)
  |                 |                           |            |           |————2) gorestfulContainer := restful.NewContainer()
  |                 |                           |            |————2) &GenericAPIServer
  |                 |                           |            |————3) installAPI(s *GenericAPIServer, c *Config) //第一次装载
  |                 |                           |                        |———— 1) routes.Index{}.Install(s.listedPathProvider, s.Handler.NonGoRestfulMux)
  |                 |                           |                        |———— 2) routes.SwaggerUI{}.Install(s.Handler.NonGoRestfulMux)
  |                 |                           |                        |———— 3) routes.Profiling{}.Install(s.Handler.NonGoRestfulMux)
  |                 |                           |                        |———— 4) routes.MetricsWithReset{}.Install(s.Handler.NonGoRestfulMux)
  |                 |                           |                        |———— 5) routes.DefaultMetrics{}.Install(s.Handler.NonGoRestfulMux)
  |                 |                           |                        |———— 6) routes.Version{Version: c.Version}.Install(s.Handler.GoRestfulContainer)
  |                 |                           |                        |———— 7) s.Handler.GoRestfulContainer.Add(s.DiscoveryGroupManager.WebService())  
  |                 |                           |————2) genericapiserver.NewDefaultAPIGroupInfo
  |                 |                           |————3) apiGroupInfo里设置VersionedResourcesStorageMap,NewREST(scheme *runtime.Scheme, optsGetter generic.RESTOptionsGetter) *REST
  |                 |                           |————4) s.InstallAPIGroup(apiGroupInfo *APIGroupInfo)
  |                 |                           |           |———— (*GenericAPIServer) installAPIResources(apiPrefix string, apiGroupInfo *APIGroupInfo)
  |                 |                           |                        |———— (g *APIGroupVersion) InstallREST(container *restful.Container)
  |                 |                           |                                       |———— (a *APIInstaller) Install() ([]metav1.APIResource, *restful.WebService, []error)
  |                 |                           |                                                  |———— (a *APIInstaller) newWebService() *restful.WebService
  |                 |                           |                                                  |———— 最终的API注册过程是在这个函数中完成的,把一个rest.Storage对象转换为实际的getter, lister等处理函数,并和实际的url关联起来。
  |                 |                           |                                                  |———— (a *APIInstaller) registerResourceHandlers(path string, rest.Storage, *restful.WebService, proxyHandler http.Handler) (*metav1.APIResource, error)
  |                 |                           |————5) s.GenericAPIServer.Handler.NonGoRestfulMux.Handle("/apis", crdHandler)
  |                 |                           |————6) s.GenericAPIServer.Handler.NonGoRestfulMux.HandlePrefix("/apis/", crdHandler)
  |                 |————4) CreateKubeAPIServer(*master.Config, genericapiserver.DelegationTarget, informers.SharedInformerFactory, clientgoinformers.SharedInformerFactory) (*master.Master, error)
  |                 |               |————1) kubeAPIServerConfig.Complete(versionedInformers):master.CompletedConfig:cfg.GenericConfig.Complete(informers)
  |                 |               |                           |————(c *Config) Complete(informers informers.SharedInformerFactory) CompletedConfig: c.Config.Complete(c.SharedInformerFactory)
  |                 |               |   //第二次装载 功能性的、主要的api是在m.InstallAPIs()中装载的。
  |                 |               |————2) (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget):*Master
  |                 |                           |————1) c.GenericConfig.New("kube-apiserver", delegationTarget)
  |                 |                           |————2) (m *Master) InstallLegacyAPI(c *completedConfig, generic.RESTOptionsGetter, corerest.LegacyRESTStorageProvider)
  |                 |                           |           |————1) (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generic.RESTOptionsGetter) (LegacyRESTStorage, genericapiserver.APIGroupInfo, error)
  |                 |                           |           |          |———— nodestore.NewStorage(restOptionsGetter, c.KubeletClientConfig, c.ProxyTransport): //func NewStorage(...就是controller创建
  |                 |                           |           |                       |———— (e *Store) CompleteWithOptions(options *generic.StoreOptions)
  |                 |                           |           |                                   |———— (f *storageFactoryRestOptionsFactory) GetRESTOptions(resource schema.GroupResource):generic.RESTOptions
  |                 |                           |           |                                               |———— ret.Decorator = genericregistry.StorageWithCacher(cacheSize)
  |                 |                           |           |                                                       |———— 1) s, d := generic.NewRawStorage(storageConfig)
  |                 |                           |           |                                                       |       |———— s, d, err := factory.Create(*config)
  |                 |                           |           |                                                       |               |———— newETCD2Storage(c)
  |                 |                           |           |                                                       |               |———— newETCD3Storage(c)
  |                 |                           |           |                                                       |———— 2) cacher := storage.NewCacherFromConfig(cacherConfig)
  |                 |                           |           |————2) (s *GenericAPIServer) InstallLegacyAPIGroup(apiPrefix string, apiGroupInfo *APIGroupInfo)
  |                 |                           |————3) (m *Master) InstallAPIs(serverstorage.APIResourceConfigSource, generic.RESTOptionsGetter, restStorageProviders ...RESTStorageProvider)
  |                 |                                                  |———— (s *GenericAPIServer) InstallAPIGroup(apiGroupInfo *APIGroupInfo) 
  |                 |————5) kubeAPIServer.GenericAPIServer.PrepareRun()
  |                 |               |————1) routes.Swagger{Config: s.swaggerConfig}.Install(s.Handler.GoRestfulContainer)
  |                 |               |————2) routes.OpenAPI{Config: s.openAPIConfig,}.Install(s.Handler.GoRestfulContainer, s.Handler.NonGoRestfulMux)
  |                 |               |————3) s.installHealthz()
  |                 |————6) apiExtensionsServer.GenericAPIServer.PrepareRun()
  |                 |————7) createAggregatorConfig(genericapiserver.Config, *options.ServerRunOptions, kubeexternalinformers.SharedInformerFactory, aggregatorapiserver.ServiceResolver, *http.Transport) (*aggregatorapiserver.Config, error)
  |                 |————8) createAggregatorServer(*aggregatorapiserver.Config, genericapiserver.DelegationTarget, apiextensionsinformers.SharedInformerFactory) (*aggregatorapiserver.APIAggregator, error)
  |                 |               |————1) aggregatorConfig.Complete().NewWithDelegate(delegateAPIServer):*APIAggregator
  |                 |                           |———— c.GenericConfig.New("kube-aggregator", delegationTarget)
  |                 |————9) BuildInsecureHandlerChain(apiHandler http.Handler, c *server.Config) http.Handler
  |                 |————10) NonBlockingRun( *InsecureServingInfo, insecureHandler http.Handler, shutDownTimeout time.Duration, stopCh <-chan struct{}) error 
  |                                 |————serveInsecurely(insecureServingInfo *InsecureServingInfo, insecureHandler http.Handler, shutDownTimeout time.Duration, stopCh <-chan struct{})
  |                                         |————server.RunServer(insecureServer, ln, shutDownTimeout, stopCh)
  |
  |————2) (s *GenericAPIServer) PrepareRun() preparedGenericAPIServer
  |
  |————3) (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error
                            |————(s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{})
                                            |———— (s *GenericAPIServer) serveSecurely(stopCh <-chan struct{}) 设置APIServerHandler为http处理函数
                                                        |———— RunServer(secureServer, s.SecureServingInfo.Listener, s.ShutdownTimeout, stopCh)

关键结构:

ServerRunOptions结构:

路径: cmd/kube-apiserver/app/options/options.go

type ServerRunOptions struct {
    // 重名,下面称为GenericServerRunOptions
    GenericServerRunOptions     *genericoptions.ServerRunOptions    // 服务器通用的运行参数
    AllowPrivileged             bool  // 是否配置超级权限,即允许Pod中运行的容器拥有系统特权
    EventTTL                    time.Duration  // 事件留存事件, 默认1h
    KubeletConfig               kubeletclient.KubeletClientConfig  // K8S kubelet配置
    MaxConnectionBytesPerSec    int64    // 每秒的最大连接数
    // 指定的话,可以通过SSH指定的秘钥文件和用户名对Node进行访问
    SSHKeyfile                  string
    SSHUser                     string
    // 包含PEM-encoded x509 RSA公钥和私钥的文件路径,用于验证Service Account的token
    // 不指定的话,则使用--tls-private-key-file指定的文件
    ServiceAccountKeyFile       string
    // 设置为true时,系统会到etcd验证ServiceAccount token是否存在
    ServiceAccountLookup        bool
    WebhookTokenAuthnConfigFile string
    WebhookTokenAuthnCacheTTL   time.Duration
}

ServerRunOptions结构:

路径: pkg/genericapiserver/options/server_run_options.go

type ServerRunOptions struct {
    // 准入控制,如:"AlwaysAdmit","LimitRanger","ReousrceQuota"等
    AdmissionControl           string      
    // 准入控制的配置文件
    AdmissionControlConfigFile string      
    // 用于广播给集群的所有成员自己的IP地址,不指定的话就使用"--bind-address"的IP地址
    AdvertiseAddress           net.IP    
    // 安全访问的认证模式列表,以逗号分隔,包括:AlwaysAllow、AlwaysDeny、ABAC、Webhook、RBAC
    AuthorizationMode                        string
    // mode设置为ABAC时使用的csv格式的授权配置文件
    AuthorizationPolicyFile                  string
    // 下列跟mode配置成webhook有关
    AuthorizationWebhookConfigFile           string
    AuthorizationWebhookCacheAuthorizedTTL   time.Duration
    AuthorizationWebhookCacheUnauthorizedTTL time.Duration
    // mode设置为RBAC时使用的超级用户名,用该用户名进行RBAC认证
    AuthorizationRBACSuperUser               string

    AnonymousAuth                bool
    // 使用http基本认证的方式访问API Server的安全端口
    BasicAuthFile                string
    // 默认"0.0.0.0",apiServer在该地址的6443端口上开启https服务
    BindAddress                  net.IP
    // TLS证书所在目录,默认"/var/run/kubernetes"
    CertDirectory                string
    // 指定的话,该客户端证书将用于认证过程
    ClientCAFile                 string
    // 下列的云服务商有关
    CloudConfigFile              string
    CloudProvider                string
    // CORS 跨域资源共享
    CorsAllowedOriginList        []string
    // 默认的持久化存储格式,比如"application/json"
    DefaultStorageMediaType      string
    // 指定清理的工作线程数,可以提高清理namespace的效率,但是会增加系统资源的占用
    DeleteCollectionWorkers      int
    // 日志相关策略
    AuditLogPath                 string
    AuditLogMaxAge               int
    AuditLogMaxBackups           int
    AuditLogMaxSize              int
    // 使能GC
    EnableGarbageCollection      bool
    // 打开性能分析,可以通过<host>:<port>/debug/pprof/地址来查看程序栈,线程等信息
    EnableProfiling              bool
    EnableContentionProfiling    bool
    // 使能swaggerUI,访问地址<host>:<port>/swagger-ui
    EnableSwaggerUI              bool
    // 使能watch cache,对所有的watch操作进行缓存
    EnableWatchCache             bool
    // 按资源覆盖etcd服务的设置,以逗号分隔,比如group/resource#servers,其中servers为: http://ip:port
    EtcdServersOverrides         []string
    StorageConfig                storagebackend.Config
    // 用于生成该master对外的URL地址
    ExternalHost                 string
    // 绑定的不安全地址,即8080端口绑定的地址
    InsecureBindAddress          net.IP
    // 非安全端口,默认8080
    InsecurePort                 int
    // 设置keystone鉴权插件地址
    KeystoneURL                  string
    KeystoneCAFile               string
    
    KubernetesServiceNodePort    int
    LongRunningRequestRE         string
    // master数量
    MasterCount                  int
    // 设置master服务所在的namespace,默认为default
    MasterServiceNamespace       string
    // 同时处理的最大请求数,默认为400,超过该请求数将被拒绝。仅用于长时间执行的请求
    MaxRequestsInFlight          int
    // 最小请求处理超时时间,默认1800s,仅用于watch request
    MinRequestTimeout            int
    // 该文件内设置鉴权机构
    OIDCCAFile                   string
    OIDCClientID                 string
    OIDCIssuerURL                string
    OIDCUsernameClaim            string
    OIDCGroupsClaim              string
    RequestHeaderUsernameHeaders []string
    RequestHeaderClientCAFile    string
    RequestHeaderAllowedNames    []string
    // 一组key=value用于运行时的配置信息。api/<groupVersion>/<resource>,用于打开或者关闭对某个API版本的支持
    // api/all和api/legacy特别用于支持所有版本的API或支持旧版本的API
    RuntimeConfig                config.ConfigurationMap
    // https安全端口,默认6443;设置为0,表示不开启https
    SecurePort                   int
    // service的Cluster IP池
    ServiceClusterIPRange        net.IPNet // TODO: make this a list
    // service的NodePort模式下能使用的主机端口号范围,默认是30000--32767
    ServiceNodePortRange         utilnet.PortRange
    // 持久化存储的资源版本号,例如"group1/version1,group2/version2,..."
    StorageVersions              string
    // The default values for StorageVersions. StorageVersions overrides
    // these; you can change this if you want to change the defaults (e.g.,
    // for testing). This is not actually exposed as a flag.
    DefaultStorageVersions string
    TargetRAMMB            int
    // TLS CA文件
    TLSCAFile              string
    // 包含x509证书的文件路径,用于https认证
    TLSCertFile            string
    // 包含x509与tls-cert-file对应的私钥文件路径
    TLSPrivateKeyFile      string
    SNICertKeys            []config.NamedCertKey
    // 用于访问APIServer安全端口的token认证文件路径
    TokenAuthFile          string
    // 使能token
    EnableAnyToken         bool
    // 设置各资源对象watch缓存大小的列表,以逗号分隔,格式为resource#size
    // 前提是EnableWatchCache为true
    WatchCacheSizes        []string
}

ApiServer启动

路径:kubernetes/cmd/kube-apiserver/apiserver.go
入口main()函数:

func main() {
    rand.Seed(time.Now().UTC().UnixNano())

    // 新建一个apiserver对象
    s := options.NewServerRunOptions()
    // 接受用户命令行输入,其实就是自定义上述apiserver对象 
    s.AddFlags(pflag.CommandLine)
    // 解析并格式化用户传入的参数,最后填充APIServer结构体的各成员
    flag.InitFlags()
    // 初始化log配置,包括log输出位置、log等级等。
    logs.InitLogs()
    // 保证了即使apiserver异常崩溃了也能将内存中的log信息保存到磁盘文件中。 
    defer logs.FlushLogs()
    // 如果用户只是想看apiserver的版本号而不是启动apiserver,则打印apiserver的版本号并退出。
    verflag.PrintAndExitIfRequested()

    // 将创建的apiserver对象传入app.Run()中,最终绑定本地端口并绑定本地端口并创建一个HTTP Server与一个HTTPS Server。
    if err := app.Run(s); err != nil {
        fmt.Fprintf(os.Stderr, "%v\n", err)
        os.Exit(1)
    }
}

新建一个apiserver对象---NewServerRunOptions():

func NewServerRunOptions() *ServerRunOptions {
    s := ServerRunOptions{
        // 初始化通用的apiserver运行参数,包括etcd后端存储参数
        GenericServerRunOptions: genericoptions.NewServerRunOptions().WithEtcdOptions(),
        // 事件的存储保留时间
        EventTTL:                1 * time.Hour,
        // Node上kubelet的客户端配置
        KubeletConfig: kubeletclient.KubeletClientConfig{
            // kubelet通信端口
            Port: ports.KubeletPort,
            PreferredAddressTypes: []string{
                string(api.NodeHostName),
                string(api.NodeInternalIP),
                string(api.NodeExternalIP),
                string(api.NodeLegacyHostIP),
            },
            // 是否开启https
            EnableHttps: true,
            // HTTP超时
            HTTPTimeout: time.Duration(5) * time.Second,
        },
        // 将webhook token authenticator返回的响应保存在缓存内的时间
        WebhookTokenAuthnCacheTTL: 2 * time.Minute,
    }
    return &s
}

上面的接口在初始化GenericServerRunOptions参数时又调用了genericoptions.NewServerRunOptions().WithEtcdOptions()接口,先来看下与上面接口名字一样的NewServerRunOptions():

func NewServerRunOptions() *ServerRunOptions {
    return &ServerRunOptions{
        // 以逗号作为分隔符的Admission Control插件的排序列表
        AdmissionControl:                         "AlwaysAdmit",
        AnonymousAuth:                            false,
        // 授权模式
        AuthorizationMode:                        "AlwaysAllow",
        AuthorizationWebhookCacheAuthorizedTTL:   5 * time.Minute,
        AuthorizationWebhookCacheUnauthorizedTTL: 30 * time.Second,
        // apiserver绑定的网卡地址
        BindAddress:                              net.ParseIP("0.0.0.0"),
        // 证书目录
        CertDirectory:                            "/var/run/kubernetes",
        // 默认的对象存储类型
        DefaultStorageMediaType:                  "application/json",
        DefaultStorageVersions:                   registered.AllPreferredGroupVersions(),
        DeleteCollectionWorkers:                  1,
        EnableGarbageCollection:                  true,
        EnableProfiling:                          true,
        EnableContentionProfiling:                false,
        EnableWatchCache:                         true,
        // HTTP绑定的IP地址
        InsecureBindAddress:                      net.ParseIP("127.0.0.1"),
        // 不安全端口(HTTP)
        InsecurePort:                             8080,
        LongRunningRequestRE:                     DefaultLongRunningRequestRE,
        // Kubernetes系统中Master的数量
        MasterCount:                              1,
        MasterServiceNamespace:                   api.NamespaceDefault,
        MaxRequestsInFlight:                      400,
        MinRequestTimeout:                        1800,
        // k8s运行时环境配置
        RuntimeConfig:                            make(config.ConfigurationMap),
        // 安全端口
        SecurePort:                               6443,
        ServiceNodePortRange:                     DefaultServiceNodePortRange,
        StorageVersions:                          registered.AllPreferredGroupVersions(),
    }
}

可以看到初始化的时候会有SecurePort、InsecurePort,实际就是对应HTTP、HTTPS的绑定端口。
我们可以看到这里的控制还是很全面的,包括安全控制(CertDirectory, HTTPS默认启动)、权限控制(AdmissionControl,AuthorizationMode)、服务限流控制(MaxRequestsInFlight)等。
具体的参数前面介绍结构体时基本都有提到。
继续后端存储etcd的配置初始化WithEtcdOptions():

func (o *ServerRunOptions) WithEtcdOptions() *ServerRunOptions {
    o.StorageConfig = storagebackend.Config{
        // etcd的默认路径前缀:/registry
        Prefix: DefaultEtcdPathPrefix,
        // 反序列化cache,未设置的话,会根据apiServer的内存限制进行配置
        DeserializationCacheSize: 0,
    }
    return o
}

到这里apiServer的运行参数初始化关键性步骤基本结束,至于后面的s.AddFlags(pflag.CommandLine)就是获取命令行的输入信息,然后进行重新覆盖,这里就不讲了。
可以根据kube-apiserver进程的命令行信息,把命令行传参和结构配置进行对应:

#/usr/bin/kube-apiserver --logtostderr=true --v=0 --etcd-servers=http://test-master:2379 --insecure-bind-address=0.0.0.0 --port=8080 --kubelet-port=10250 --allow-privileged=false --service-cluster-ip-range=10.254.0.0/16 --admission-control=NamespaceLifecycle,NamespaceExists,LimitRanger,SecurityContextDeny,ServiceAccount,ResourceQuota --service-account-key-file=/var/run/kubernetes/apiserver.key

初始化完成之后,最重要的任务就是启动实例了。
所有的操作都是在run函数中执行,app.run()接口实现在cmd/kube-apiserver/app/server.go。
在最后APIServer会启动HTTP/HTTPS服务。

// Run runs the specified APIServer.  This should never exit.
func Run(completeOptions completedServerRunOptions, stopCh <-chan struct{}) error {
    // To help debugging, immediately log version
    klog.Infof("Version: %+v", version.Get())

  // 创建调用链
    server, err := CreateServerChain(completeOptions, stopCh)
    if err != nil {
        return err
    }

  // 进行一些准备工作, 注册一些hander,执行hook等
    prepared, err := server.PrepareRun()
    if err != nil {
        return err
    }

  // 开始执行
    return prepared.Run(stopCh)
}

执行NonBlockingRun k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go:351

func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}) error {
    auditStopCh := make(chan struct{})

    // 1. 判断是否要启动审计日志
    if s.AuditBackend != nil {
        if err := s.AuditBackend.Run(auditStopCh); err != nil {
            return fmt.Errorf("failed to run the audit backend: %v", err)
        }
    }

    // 2. 启动 https server
    internalStopCh := make(chan struct{})
    var stoppedCh <-chan struct{}
    if s.SecureServingInfo != nil && s.Handler != nil {
        var err error
        stoppedCh, err = s.SecureServingInfo.Serve(s.Handler, s.ShutdownTimeout, internalStopCh)
        if err != nil {
            close(internalStopCh)
            close(auditStopCh)
            return err
        }
    }

    go func() {
        <-stopCh
        close(s.readinessStopCh)
        close(internalStopCh)
        if stoppedCh != nil {
            <-stoppedCh
        }
        s.HandlerChainWaitGroup.Wait()
        close(auditStopCh)
    }()

    // 3. 执行 postStartHooks
    s.RunPostStartHooks(stopCh)

    // 4. 向 systemd 发送 ready 信号
    if _, err := systemd.SdNotify(true, "READY=1\n"); err != nil {
        klog.Errorf("Unable to send systemd daemon successful start message: %v\n", err)
    }

    return nil
}

pkg目录下
admission 提供了一些权限管理的控制器
apis 是由code-generator自动生成的一些api包装器。
audit 是授权信息提取模块。给其他服务组件用的。
authentication 是角色管理的组件
authorization 是认证模块
endpoinsts 处理请求的http/https端点。
features
registry 跟etcd打交道的
server apiserver的 web模块
storage 跟etcd打交道的
utils
plugin/pkg


image.png

CreateServerChain

首先是创建ServerChain

// CreateServerChain creates the apiservers connected via delegation.
func CreateServerChain(completedOptions completedServerRunOptions, stopCh <-chan struct{}) (*aggregatorapiserver.APIAggregator, error) {
  // nodetunneler与node通信,proxy实现代理功能,转发请求给其他apiservice
  // apiserver到cluster的通信可以通过三种方法
  // apiserver到kubelet的endpoint,用于logs功能,exec功能,port-forward功能
  // HTTP连接,即使可以用HTTPS也不做任何其他校验,并不安全
  // ssh tunnel,不推荐使用

  nodeTunneler, proxyTransport, err := CreateNodeDialer(completedOptions)
    // 1. 为 kubeAPIServer 创建配置
    kubeAPIServerConfig, insecureServingInfo, serviceResolver, pluginInitializer, admissionPostStartHook, err :=                                         CreateKubeAPIServerConfig(completedOptions, nodeTunneler, proxyTransport)
    if err != nil {
        return nil, err
    }

    // 2. 判断是否配置了 APIExtensionsServer,创建 apiExtensionsConfig 
    apiExtensionsConfig, err := createAPIExtensionsConfig(*kubeAPIServerConfig.GenericConfig, kubeAPIServerConfig.ExtraConfig.VersionedInformers,        pluginInitializer, completedOptions.ServerRunOptions, completedOptions.MasterCount,vc
        serviceResolver, webhook.NewDefaultAuthenticationInfoResolverWrapper(proxyTransport, kubeAPIServerConfig.GenericConfig.LoopbackClientConfig))
    if err != nil {
        return nil, err
    }
    
    // 3. 初始化 APIExtensionsServer, 通过一个空的delegate初始化
    apiExtensionsServer, err := createAPIExtensionsServer(apiExtensionsConfig, genericapiserver.NewEmptyDelegate())
    if err != nil {
        return nil, err
    }

    // 4. 初始化 KubeAPIServer
    kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, apiExtensionsServer.GenericAPIServer, admissionPostStartHook)
    if err != nil {
        return nil, err
    }
    
    // 5. 创建 AggregatorConfig
    aggregatorConfig, err := createAggregatorConfig(*kubeAPIServerConfig.GenericConfig, completedOptions.ServerRunOptions, kubeAPIServerConfig.          ExtraConfig.VersionedInformers, serviceResolver, proxyTransport, pluginInitializer)
    if err != nil {
        return nil, err
    }
    
    // 6. 初始化 AggregatorServer
    aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, apiExtensionsServer.Informers)
    if err != nil {
        return nil, err
    }

    // 7. 判断是否启动非安全端口的 http server
    if insecureServingInfo != nil {
        insecureHandlerChain := kubeserver.BuildInsecureHandlerChain(aggregatorServer.GenericAPIServer.UnprotectedHandler(), kubeAPIServerConfig.GenericConfig)
        if err := insecureServingInfo.Serve(insecureHandlerChain, kubeAPIServerConfig.GenericConfig.RequestTimeout, stopCh); err != nil {
            return nil, err
        }
    }
    return aggregatorServer, nil
}

这部分代码比较清晰,CreateNodeDialer是根据配置生成ssh tunneler,用以连接各个node的kubelet。另外就是创建了3个config和3个server:

config:
  • controlplane.Config
  • apiextensionsapiserver.Config
  • aggregatorapiserver.Config
server:
  • apiextensionsapiserver.CustomResourceDefinitions
  • kubeAPIServer.kubeAPIServer
  • aggregatorapiserver.APIAggregator

config和server的对应关系应当一目明了吧,这里是按照创建顺序书写的。

controlplane.Config的配置是相当的多,包含genericapiserver.Config和ExtraConfig, genericapiserver.Config的注释里面也说了,在代码里是按照重要级别排序的,前面几个分别是server配置、验证配置、授权配置。ExtraConfig主要包含的是apiserver额外的信息:集群认证信息、etcd配置等。

type Config struct {
    GenericConfig *genericapiserver.Config
    ExtraConfig   ExtraConfig
}

这里主要方法是CreateKubeAPIServerConfig,该方法返回controlplane.Config:

创建 apiserver 通用配置

在 CreateServerChain 函数中,基本可以从函数名猜出来每一步在干什么,通用配置的创建就是在函数 CreateKubeAPIServerConfig() 中完成的。进入到以下函数中看一下详细实现:CreateKubeAPIServerConfig() --> buildGenericConfig()。
创建通用配置流程主要有以下几步:
1.GenericConfig 实例化
代码如下:

#buildGenericConfig
func buildGenericConfig(
    ...
    #默认生成api文档
    genericConfig.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig(generatedopenapi.GetOpenAPIDefinitions, openapinamer.NewDefinitionNamer(legacyscheme.Scheme, extensionsapiserver.Scheme, aggregatorscheme.Scheme))
    ...
    #采用etcd作为存储方案
    completedStorageFactoryConfig, err := storageFactoryConfig.Complete(s.Etcd)
    #认证机制
    if lastErr = s.Authentication.ApplyTo
    #授权机制
    genericConfig.Authorization.Authorizer, genericConfig.RuleResolver, err = BuildAuthorizer(s, genericConfig.EgressSelector, versionedInformers)
    #准入机制
    err = s.Admission.ApplyTo()
}

2.StorageFactoryConfig
apiserver 组件使用 etcd 作为集群的存储,系统中所使用的所有资源、集群状态、配置等都在这上面保存。代码部分如下:

// 初始化 storageFactoryConfig 配置对象。
storageFactoryConfig := kubeapiserver.NewStorageFactoryConfig()
storageFactoryConfig.APIResourceConfig = genericConfig.MergedResourceConfig
// 初始化 etcd 相关的配置信息,补全配置对象,返回 completedStorageFactoryConfig
completedStorageFactoryConfig, err := storageFactoryConfig.Complete(s.Etcd)
if err != nil {
    lastErr = err
    return
}
// 根据上面补完的配置信息,创建 storageFactory 对象
storageFactory, lastErr = completedStorageFactoryConfig.New()
if lastErr != nil {
    return
}

3.Authorizer认证、授权配置
作为整个系统的存储对象交互入口,每个系统的请求都需要经过认证、授权、准入控制器这些阶段,准入控制器下面再说,涉及到的代码如下:

认证机制

s.Authentication.ApplyTo

func (o *BuiltInAuthenticationOptions) ApplyTo(authInfo *genericapiserver.AuthenticationInfo, secureServing *genericapiserver.SecureServingInfo, egressSelector *egressselector.EgressSelector, openAPIConfig *openapicommon.Config, extclient kubernetes.Interface, versionedInformer informers.SharedInformerFactory){
    ...
    #实例化Config
    authInfo.Authenticator, openAPIConfig.SecurityDefinitions, err = authenticatorConfig.New()
    ...
}

authenticatorConfig.New

func (config Config) New() (authenticator.Request, *spec.SecurityDefinitions, error) {
#重点关注authenticators 和 tokenAuthenticators两个变量
var authenticators []authenticator.Request
var tokenAuthenticators []authenticator.Token
#添加requestHeader认证方式
if config.RequestHeaderConfig != nil {
requestHeaderAuthenticator := headerrequest.NewDynamicVerifyOptionsSecure(
...
)
#追加认证方式
authenticators = append(authenticators, authenticator.WrapAudienceAgnosticRequest(config.APIAudiences, requestHeaderAuthenticator))
}
#添加CLientCA认证方式
if config.ClientCAContentProvider != nil {
...
}
#添加Token认证方式
if len(config.TokenAuthFile) > 0 {
tokenAuth, err := newAuthenticatorFromTokenFile(config.TokenAuthFile)

}
#...其他各种认证方式
#如果没有认证方式 则启动anonymous
if len(authenticators) == 0 {
    if config.Anonymous {
        return anonymous.NewAuthenticator(), &securityDefinitions, nil
    }
    return nil, &securityDefinitions, nil
}
#整合两种认证方式authenticators 和 tokenAuthenticators
authenticator := union.New(authenticators...)
}

return authenticator, &securityDefinitions, nil

}

``

授权机制

genericConfig.Authorization.Authorizer, genericConfig.RuleResolver, err = BuildAuthorizer(s, genericConfig.EgressSelector, versionedInformers)
调用此函数 authorizationConfig.New()

New
func (config Config) New() (authorizer.Authorizer, authorizer.RuleResolver, error) {
  ...
  case modes.ModeNode:
  ...
  const (
  ...
  #对生产来说最有用出的模式RBAC 角色-用户模式
  ModeRBAC string = "RBAC"
  ...
)

New 函数主要就是在 for 循环中根据 config.AuthorizationModes 配置了 authorizers 和 ruleResolvers 两个变量,这个 config.AuthorizationModes 是在最初在初始化配置对象执行 options.NewServerRunOptions() 的时候赋值的,具体路径及代码如下:
k8s.io/kubernetes/pkg/kubeapiserver/options/authorization.go

func NewBuiltInAuthorizationOptions() *BuiltInAuthorizationOptions {
    return &BuiltInAuthorizationOptions{
        // 初始赋值就一个 "AlwaysAllow" 字符串,这是默认的配置。
        Modes:                       []string{authzmodes.ModeAlwaysAllow},
        WebhookVersion:              "v1beta1",
        WebhookCacheAuthorizedTTL:   5 * time.Minute,
        WebhookCacheUnauthorizedTTL: 30 * time.Second,
        WebhookRetryBackoff:         genericoptions.DefaultAuthWebhookRetryBackoff(),
    }
}

在回到New()中,在函数最后返回了认证和授权对象。

return union.New(authorizers...), union.NewRuleResolvers(ruleResolvers...), nil
authorizers 是已启用的认证器列表,union.New将它合并成一个认证器。

ruleResolvers 是已启用的规则解析器,union.NewRuleResolvers 也是合并了一下

可以看到默认的授权是 AlwaysAllow,具体其它类型可以在启动的时候在配置里面设置,只要配置了,就会实例化该授权对象,认证的时候会遍历每一个授权器,有一个认证成功就ok。

CreateKubeAPIServerConfig该方法创建了apiserver所需要的所有资源,并将ServerRunOptions里面的参数转化为genericapiserver.Config和ExtraConfig。

创建APIExtensionConfig

创建服务链的第二步是创建ApiExtensionConfig, ApiExtensionConfig与ApiServerConfig几乎一致,在ExtraConfig上是不同的, ApiExtensionConfig的ExtraConfig只有几个字段,少很多。创建ApiExtensionConfig的方法是createAPIExtensionsConfig, 该方法主要做了四件事:

  • 将ApiServerConfig中的GenericConfig进行了浅拷贝。
  • 重写GenericConfig的AdmissionControl,因为ApiExtentionServer需要用自己的scheme。
  • 拷贝了etcd的配置,避免相互影响。
  • 重写MergedResourceConfig。

创建APIExtensionServer

当创建好APIExtensionConfig之后,就是创建APIExtensionServer,调用方法如下:
apiExtensionsServer, err := createAPIExtensionsServer(apiExtensionsConfig, genericapiserver.NewEmptyDelegate())
作为request 处理链的最后一环,这里传了一个空的DelegateTarget,和后面创建KubeApiServer不一样,创建KubeApiServer传入了APIExtensionServer的GenericAPIServer。回到createAPIExtensionsServer方法,最终会调用到apiextension-apiserver/apiserver#New()

func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*CustomResourceDefinitions, error) {
    genericServer, err := c.GenericConfig.New("apiextensions-apiserver", delegationTarget)
    if err != nil {
        return nil, err
    }

    s := &CustomResourceDefinitions{
        GenericAPIServer: genericServer,
    }

    ... ...

    if err := s.GenericAPIServer.InstallAPIGroup(&apiGroupInfo); err != nil {
        return nil, err
    }

    ... ...


    delegateHandler := delegationTarget.UnprotectedHandler()
    if delegateHandler == nil {
        delegateHandler = http.NotFoundHandler()
    }

    versionDiscoveryHandler := &versionDiscoveryHandler{
        discovery: map[schema.GroupVersion]*discovery.APIVersionHandler{},
        delegate:  delegateHandler,
    }
    groupDiscoveryHandler := &groupDiscoveryHandler{
        discovery: map[string]*discovery.APIGroupHandler{},
        delegate:  delegateHandler,
    }
    establishingController := establish.NewEstablishingController(s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions(), crdClient.Apiextensions())
    crdHandler, err := NewCustomResourceDefinitionHandler(
        versionDiscoveryHandler,
        groupDiscoveryHandler,
        s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions(),
        delegateHandler,
        c.ExtraConfig.CRDRESTOptionsGetter,
        c.GenericConfig.AdmissionControl,
        establishingController,
        c.ExtraConfig.ServiceResolver,
        c.ExtraConfig.AuthResolverWrapper,
        c.ExtraConfig.MasterCount,
        s.GenericAPIServer.Authorizer,
        c.GenericConfig.RequestTimeout,
        time.Duration(c.GenericConfig.MinRequestTimeout)*time.Second,
        apiGroupInfo.StaticOpenAPISpec,
        c.GenericConfig.MaxRequestBodyBytes,
    )
    if err != nil {
        return nil, err
    }
    s.GenericAPIServer.Handler.NonGoRestfulMux.Handle("/apis", crdHandler)
    s.GenericAPIServer.Handler.NonGoRestfulMux.HandlePrefix("/apis/", crdHandler)

    crdController := NewDiscoveryController(s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions(), versionDiscoveryHandler, groupDiscoveryHandler)
    namingController := status.NewNamingConditionController(s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions(), crdClient.Apiextensions())
    nonStructuralSchemaController := nonstructuralschema.NewConditionController(s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions(), crdClient.Apiextensions())
    apiApprovalController := apiapproval.NewKubernetesAPIApprovalPolicyConformantConditionController(s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions(), crdClient.Apiextensions())
    finalizingController := finalizer.NewCRDFinalizer(
        s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions(),
        crdClient.Apiextensions(),
        crdHandler,
    )
    var openapiController *openapicontroller.Controller
    if utilfeature.DefaultFeatureGate.Enabled(apiextensionsfeatures.CustomResourcePublishOpenAPI) {
        openapiController = openapicontroller.NewController(s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions())
    }

    s.GenericAPIServer.AddPostStartHookOrDie("start-apiextensions-informers", func(context genericapiserver.PostStartHookContext) error {
        s.Informers.Start(context.StopCh)
        return nil
    })
    s.GenericAPIServer.AddPostStartHookOrDie("start-apiextensions-controllers", func(context genericapiserver.PostStartHookContext) error {
        // OpenAPIVersionedService and StaticOpenAPISpec are populated in generic apiserver PrepareRun().
        // Together they serve the /openapi/v2 endpoint on a generic apiserver. A generic apiserver may
        // choose to not enable OpenAPI by having null openAPIConfig, and thus OpenAPIVersionedService
        // and StaticOpenAPISpec are both null. In that case we don't run the CRD OpenAPI controller.
        if utilfeature.DefaultFeatureGate.Enabled(apiextensionsfeatures.CustomResourcePublishOpenAPI) && s.GenericAPIServer.OpenAPIVersionedService != nil && s.GenericAPIServer.StaticOpenAPISpec != nil {
            go openapiController.Run(s.GenericAPIServer.StaticOpenAPISpec, s.GenericAPIServer.OpenAPIVersionedService, context.StopCh)
        }

        go crdController.Run(context.StopCh)
        go namingController.Run(context.StopCh)
        go establishingController.Run(context.StopCh)
        go nonStructuralSchemaController.Run(5, context.StopCh)
        go apiApprovalController.Run(5, context.StopCh)
        go finalizingController.Run(5, context.StopCh)
        return nil
    })
    // we don't want to report healthy until we can handle all CRDs that have already been registered.  Waiting for the informer
    // to sync makes sure that the lister will be valid before we begin.  There may still be races for CRDs added after startup,
    // but we won't go healthy until we can handle the ones already present.
    s.GenericAPIServer.AddPostStartHookOrDie("crd-informer-synced", func(context genericapiserver.PostStartHookContext) error {
        return wait.PollImmediateUntil(100*time.Millisecond, func() (bool, error) {
            return s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions().Informer().HasSynced(), nil
        }, context.StopCh)
    })

    return s, nil
}

apiextensions-apiserver/apiserver.go这个文件几乎只写了这个New方法。

  • New方法的首先是通过GenericConfig New了一个GenericServer,该Server将GenericConfig中的各个配置转化为自己的属性,同时New了APIServerHandler, APIServerHandler包含APIServer使用的各种http Handler: FullHandlerChain、GoRestfulContainer、NonGoRestfulMux、Director。
    其次是暴露API Group。APIServer中是有各种不同的API Group, 比如networking.k8s.io/v1beta1、apps/v1beta1、metrics.k8s.io/v1beta1等。ExtensionApiServer主要是一些自定义的API Group。
  • NewCustomResourceDefinitionHandler 为上面GenericServer中的Handlers赋值。处理后续的API请求。
    实例化各种Controller,包括:NewDiscoveryController、NewNamingConditionController、NewConditionController、NewKubernetesAPIApprovalPolicyConformantConditionController、NewCRDFinalizer、openapicontroller。每个controller完成各自对应的业务逻辑。
  • 添加PostStart Hook, 当APIServer启动之后,异步协程调用各个Controller的Run方法。

APIExtensionsServer最先初始化,在调用链的末尾, 处理CR、CRD相关资源.
其中包含的 controller 以及功能如下所示:

  • openapiController:将 crd 资源的变化同步至提供的 OpenAPI 文档,可通过访问 /openapi/v2 进行查看;
  • crdController:负责将 crd 信息注册到 apiVersions 和 apiResources 中,两者的信息可通过 kubectl api-versions 和 kubectl api-resources 查看;
  • namingController:检查 crd obj 中是否有命名冲突,可在 crd .status.conditions 中查看;
  • establishingController:检查 crd 是否处于正常状态,可在 crd .status.conditions 中查看;
  • nonStructuralSchemaController:检查 crd obj 结构是否正常,可在 crd .status.conditions 中查看;
  • apiApprovalController:检查 crd 是否遵循 kubernetes API 声明策略,可在 crd .status.conditions 中查看;
  • finalizingController:类似于 finalizes 的功能,与 CRs 的删除有关;

c.GenericConfig.New来初始化genericapiserver,包裹一些默认链,创建handler

func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*GenericAPIServer, error) {
    if c.Serializer == nil {
        return nil, fmt.Errorf("Genericapiserver.New() called with config.Serializer == nil")
    }
    if c.LoopbackClientConfig == nil {
        return nil, fmt.Errorf("Genericapiserver.New() called with config.LoopbackClientConfig == nil")
    }
    if c.EquivalentResourceRegistry == nil {
        return nil, fmt.Errorf("Genericapiserver.New() called with config.EquivalentResourceRegistry == nil")
    }

    // 包裹了DefaultBuildHandlerChain
    handlerChainBuilder := func(handler http.Handler) http.Handler {
        return c.BuildHandlerChainFunc(handler, c.Config)
    }
    // 创建apiserverhandler
    apiServerHandler := NewAPIServerHandler(name, c.Serializer, handlerChainBuilder, delegationTarget.UnprotectedHandler())

    ...

    return s, nil
}

APIServerHandler包含多种http.Handler类型,包括go-restful以及non-go-restful,以及在以上两者之间选择的Director对象,go-restful用于处理已经注册的handler,non-go-restful用来处理不存在的handler,API URI处理的选择过程为:FullHandlerChain-> Director ->{GoRestfulContainer, NonGoRestfulMux}。NewAPIServerHandler`

unc NewAPIServerHandler(name string, s runtime.NegotiatedSerializer, handlerChainBuilder HandlerChainBuilderFn, notFoundHandler http.Handler) *APIServerHandler {
    // non-go-restful路由
    nonGoRestfulMux := mux.NewPathRecorderMux(name)
    if notFoundHandler != nil {
        nonGoRestfulMux.NotFoundHandler(notFoundHandler)
    }

    // go-resetful路由
    gorestfulContainer := restful.NewContainer()
    gorestfulContainer.ServeMux = http.NewServeMux()
    gorestfulContainer.Router(restful.CurlyRouter{}) // e.g. for proxy/{kind}/{name}/{*}
    gorestfulContainer.RecoverHandler(func(panicReason interface{}, httpWriter http.ResponseWriter) {
        logStackOnRecover(s, panicReason, httpWriter)
    })
    gorestfulContainer.ServiceErrorHandler(func(serviceErr restful.ServiceError, request *restful.Request, response *restful.Response) {
        serviceErrorHandler(s, serviceErr, request, response)
    })

    // 选择器, 根据path选择是否执行go-restful,注册过的path执行go-restful
    director := director{
        name:               name,
        goRestfulContainer: gorestfulContainer,
        nonGoRestfulMux:    nonGoRestfulMux,
    }

    return &APIServerHandler{
        FullHandlerChain:   handlerChainBuilder(director),
        GoRestfulContainer: gorestfulContainer,
        NonGoRestfulMux:    nonGoRestfulMux,
        Director:           director,
    }
}

以上是APIExtensionsServer的初始化流程,初始化Server, 调用s.GenericAPIServer.InstallAPIGroup注册api。此方法的调用链非常深,主要是为了将需要暴露的API Resource注册到 server 中,以便能通过 http 接口进行 resource 的 REST 操作,其他几种 server 在初始化时也都会执行对应的 InstallAPI方法

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,928评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,192评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,468评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,186评论 1 286
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,295评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,374评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,403评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,186评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,610评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,906评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,075评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,755评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,393评论 3 320
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,079评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,313评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,934评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,963评论 2 351

推荐阅读更多精彩内容