经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 数据库/运维 » Kubernetes » 查看文章
Kubernetes:kube-apiserver 之启动流程(一)
来源:cnblogs  作者:lubanseven  时间:2023/10/25 13:55:56  对本文有异议

0. 前言

前面两篇文章 Kubernetes:kube-apiserver 之 scheme(一)Kubernetes:kube-apiserver 之 scheme(二) 重点介绍了 kube-apiserver 中的资源注册表 scheme。这里进入正题,开始介绍 kube-apiserver 的核心实现。

1. kube-apiserver 启动流程

kube-apiserver 使用 Cobra 作为 CLI 框架,其初始化示意图如下。

image

结合示意图和代码看初始化过程效果更佳。代码在 kubernetes/cmd/kube-apiserver/apiserver.go

  1. # kubernetes/cmd/kube-apiserver/apiserver.go
  2. package main
  3. func main() {
  4. command := app.NewAPIServerCommand()
  5. code := cli.Run(command)
  6. os.Exit(code)
  7. }
  8. # kubernetes/cmd/kube-apiserver/app/server.go
  9. func NewAPIServerCommand() *cobra.Command {
  10. s := options.NewServerRunOptions()
  11. cmd := &cobra.Command{
  12. Use: "kube-apiserver",
  13. ...
  14. RunE: func(cmd *cobra.Command, args []string) error {
  15. // set default options
  16. completedOptions, err := s.Complete()
  17. if err != nil {
  18. return err
  19. }
  20. // validate options
  21. if errs := completedOptions.Validate(); len(errs) != 0 {
  22. return utilerrors.NewAggregate(errs)
  23. }
  24. return Run(completedOptions, genericapiserver.SetupSignalHandler())
  25. },
  26. }
  27. # parse flags to options
  28. fs := cmd.Flags()
  29. namedFlagSets := s.Flags()
  30. verflag.AddFlags(namedFlagSets.FlagSet("global"))
  31. return cmd
  32. }

首先调用 options.NewServerRunOptions() 实例化 options 选项,接着 s.Complete() 补全默认 options,将补全的 options 送入 Validate() 方法进行验证。验证通过后进入 Run(completedOptions, genericapiserver.SetupSignalHandler())Run() 函数是不会退出的函数,在函数内运行 kube-apiserver

有一点要注意的是,kube-apiserver参数 通过 flag 解析赋给 options,这是框架的用法,不多讲。

进入 Run() 函数内。

  1. func Run(opts options.CompletedOptions, stopCh <-chan struct{}) error {
  2. // 实例化 kube-apiserver 配置 config
  3. config, err := NewConfig(opts)
  4. if err != nil {
  5. return err
  6. }
  7. // 补全默认配置
  8. completed, err := config.Complete()
  9. if err != nil {
  10. return err
  11. }
  12. // 创建服务链
  13. server, err := CreateServerChain(completed)
  14. if err != nil {
  15. return err
  16. }
  17. prepared, err := server.PrepareRun()
  18. if err != nil {
  19. return err
  20. }
  21. return prepared.Run(stopCh)
  22. }

如注释所示,Run() 函数内 kube-apiserver 的启动流程相当清晰。

下面分步看各个流程。

1.1 实例化配置

进入 NewConfig(opts) 看实例化 config 过程。

  1. # kubernetes/cmd/kube-apiserver/app/config.go
  2. func NewConfig(opts options.CompletedOptions) (*Config, error) {
  3. // 根据 options 实例化 Config
  4. c := &Config{
  5. Options: opts,
  6. }
  7. // 创建 controlPlane 配置文件
  8. controlPlane, serviceResolver, pluginInitializer, err := CreateKubeAPIServerConfig(opts)
  9. if err != nil {
  10. return nil, err
  11. }
  12. c.ControlPlane = controlPlane
  13. // 创建 apiExtensions 配置文件
  14. apiExtensions, err := apiserver.CreateAPIExtensionsConfig(*controlPlane.GenericConfig, controlPlane.ExtraConfig.VersionedInformers, pluginInitializer, opts.CompletedOptions, opts.MasterCount,
  15. serviceResolver, webhook.NewDefaultAuthenticationInfoResolverWrapper(controlPlane.ExtraConfig.ProxyTransport, controlPlane.GenericConfig.EgressSelector, controlPlane.GenericConfig.LoopbackClientConfig, controlPlane.GenericConfig.TracerProvider))
  16. if err != nil {
  17. return nil, err
  18. }
  19. c.ApiExtensions = apiExtensions
  20. // 创建 aggregator 配置文件
  21. aggregator, err := createAggregatorConfig(*controlPlane.GenericConfig, opts.CompletedOptions, controlPlane.ExtraConfig.VersionedInformers, serviceResolver, controlPlane.ExtraConfig.ProxyTransport, controlPlane.ExtraConfig.PeerProxy, pluginInitializer)
  22. if err != nil {
  23. return nil, err
  24. }
  25. c.Aggregator = aggregator
  26. return c, nil
  27. }

kube-apiserver 的所有 REST 服务组合在一起是极为复杂的,这里 kube-apiserver 将服务拆分,解耦为三种 HTTP ServerKubeAPIServerAPIExtensionsServerAggregatorServer

image

三种 HTTP Server 拥有各自的配置文件。这里以 APIExtensionsServer 为例,查看其启动流程,其它两种 HTTP Server 与此类似。

进入 CreateKubeAPIServerConfig(opts)

  1. func CreateKubeAPIServerConfig(opts options.CompletedOptions) (
  2. *controlplane.Config,
  3. aggregatorapiserver.ServiceResolver,
  4. []admission.PluginInitializer,
  5. error,
  6. ) {
  7. // 创建通用配置
  8. genericConfig, versionedInformers, storageFactory, err := controlplaneapiserver.BuildGenericConfig(
  9. opts.CompletedOptions,
  10. []*runtime.Scheme{legacyscheme.Scheme, extensionsapiserver.Scheme, aggregatorscheme.Scheme},
  11. generatedopenapi.GetOpenAPIDefinitions,
  12. )
  13. config := &controlplane.Config{
  14. GenericConfig: genericConfig,
  15. ExtraConfig: controlplane.ExtraConfig{
  16. ...
  17. },
  18. }
  19. // setup admission
  20. admissionConfig := &kubeapiserveradmission.Config{
  21. ExternalInformers: versionedInformers,
  22. LoopbackClientConfig: genericConfig.LoopbackClientConfig,
  23. CloudConfigFile: opts.CloudProvider.CloudConfigFile,
  24. }
  25. err = opts.Admission.ApplyTo(
  26. genericConfig,
  27. versionedInformers,
  28. clientgoExternalClient,
  29. dynamicExternalClient,
  30. utilfeature.DefaultFeatureGate,
  31. pluginInitializers...)
  32. if err != nil {
  33. return nil, nil, nil, fmt.Errorf("failed to apply admission: %w", err)
  34. }
  35. ...
  36. return config, serviceResolver, pluginInitializers, nil
  37. }

试想,三种 HTTP Server 肯定有通用的配置。kube-apiserver 在函数 CreateKubeAPIServerConfig(opts) 内调用 BuildGenericConfig() 创建 HTTP Server 通用配置。

创建完通用配置后,实例化 KubeAPIServer 配置 config。接着,实例化 admission 准入相关配置,通过 opts.Admission.ApplyTo() 将准入配置赋给 config

进入 BuildGenericConfig 看通用配置创建了什么。

  1. func BuildGenericConfig(
  2. s controlplaneapiserver.CompletedOptions,
  3. schemes []*runtime.Scheme,
  4. getOpenAPIDefinitions func(ref openapicommon.ReferenceCallback) map[string]openapicommon.OpenAPIDefinition,
  5. ) (
  6. genericConfig *genericapiserver.Config,
  7. versionedInformers clientgoinformers.SharedInformerFactory,
  8. storageFactory *serverstorage.DefaultStorageFactory,
  9. lastErr error,
  10. ) {
  11. // NewConfig returns a Config struct with the default values
  12. genericConfig = genericapiserver.NewConfig(legacyscheme.Codecs)
  13. genericConfig.MergedResourceConfig = controlplane.DefaultAPIResourceConfigSource()
  14. // ApplyTo applies the run options to the method receiver and returns self
  15. if lastErr = s.GenericServerRunOptions.ApplyTo(genericConfig); lastErr != nil {
  16. return
  17. }
  18. // wrap the definitions to revert any changes from disabled features
  19. getOpenAPIDefinitions = openapi.GetOpenAPIDefinitionsWithoutDisabledFeatures(getOpenAPIDefinitions)
  20. namer := openapinamer.NewDefinitionNamer(schemes...)
  21. genericConfig.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig(getOpenAPIDefinitions, namer)
  22. genericConfig.OpenAPIConfig.Info.Title = "Kubernetes"
  23. genericConfig.OpenAPIV3Config = genericapiserver.DefaultOpenAPIV3Config(getOpenAPIDefinitions, namer)
  24. genericConfig.OpenAPIV3Config.Info.Title = "Kubernetes"
  25. // New returns a new storage factory created from the completed storage factory configuration.
  26. storageFactoryConfig := kubeapiserver.NewStorageFactoryConfig()
  27. storageFactoryConfig.APIResourceConfig = genericConfig.MergedResourceConfig
  28. storageFactory, lastErr = storageFactoryConfig.Complete(s.Etcd).New()
  29. if lastErr != nil {
  30. return
  31. }
  32. // ApplyWithStorageFactoryTo mutates the provided server.Config. It must never mutate the receiver (EtcdOptions).
  33. if lastErr = s.Etcd.ApplyWithStorageFactoryTo(storageFactory, genericConfig); lastErr != nil {
  34. return
  35. }
  36. // Authentication.ApplyTo requires already applied OpenAPIConfig and EgressSelector if present
  37. if lastErr = s.Authentication.ApplyTo(&genericConfig.Authentication, genericConfig.SecureServing, genericConfig.EgressSelector, genericConfig.OpenAPIConfig, genericConfig.OpenAPIV3Config, clientgoExternalClient, versionedInformers); lastErr != nil {
  38. return
  39. }
  40. // BuildAuthorizer constructs the authorizer
  41. genericConfig.Authorization.Authorizer, genericConfig.RuleResolver, err = BuildAuthorizer(s, genericConfig.EgressSelector, versionedInformers)
  42. if err != nil {
  43. lastErr = fmt.Errorf("invalid authorization config: %v", err)
  44. return
  45. }
  46. ...
  47. return
  48. }

通用配置内创建了一系列配置,概括如下。

image

这里不继续往下探各个配置的详细信息,后续需要再回头看。配置文件创建好后,返回 Run() 函数看服务链的创建。

1.2 创建服务链

进入 CreateServerChain 查看服务链。

  1. func CreateServerChain(config CompletedConfig) (*aggregatorapiserver.APIAggregator, error) {
  2. // New returns an HTTP handler that is meant to be executed at the end of the delegation chain.
  3. // It checks if the request have been made before the server has installed all known HTTP paths.
  4. // In that case it returns a 503 response otherwise it returns a 404.
  5. notFoundHandler := notfoundhandler.New(config.ControlPlane.GenericConfig.Serializer, genericapifilters.NoMuxAndDiscoveryIncompleteKey)
  6. // New returns a new instance of CustomResourceDefinitions from the given config.
  7. apiExtensionsServer, err := config.ApiExtensions.New(genericapiserver.NewEmptyDelegateWithCustomHandler(notFoundHandler))
  8. if err != nil {
  9. return nil, err
  10. }
  11. crdAPIEnabled := config.ApiExtensions.GenericConfig.MergedResourceConfig.ResourceEnabled(apiextensionsv1.SchemeGroupVersion.WithResource("customresourcedefinitions"))
  12. kubeAPIServer, err := config.ControlPlane.New(apiExtensionsServer.GenericAPIServer)
  13. if err != nil {
  14. return nil, err
  15. }
  16. // aggregator comes last in the chain
  17. aggregatorServer, err := createAggregatorServer(config.Aggregator, kubeAPIServer.GenericAPIServer, apiExtensionsServer.Informers, crdAPIEnabled)
  18. if err != nil {
  19. // we don't need special handling for innerStopCh because the aggregator server doesn't create any go routines
  20. return nil, err
  21. }
  22. return aggregatorServer, nil
  23. }

服务链中创建三种 HTTP Server,这里还是介绍 apiExtensionsServer 服务。

首先将 notFoundHandler handler 赋给 apiExtensionsServer,当 REST 路由不到指定 API 时会路由到 notFoundHandler 处理请求。

进入 config.ApiExtensions.New

  1. func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*CustomResourceDefinitions, error) {
  2. // 创建通用 APIServer
  3. genericServer, err := c.GenericConfig.New("apiextensions-apiserver", delegationTarget)
  4. if err != nil {
  5. return nil, err
  6. }
  7. // 实例化 APIExtensions server: CustomResourceDefinitions
  8. s := &CustomResourceDefinitions{
  9. GenericAPIServer: genericServer,
  10. }
  11. // 创建 apiGroupInfo,通过 apiGroupInfo 建立 REST API 到资源实例的路由
  12. apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(apiextensions.GroupName, Scheme, metav1.ParameterCodec, Codecs)
  13. storage := map[string]rest.Storage{}
  14. // customresourcedefinitions
  15. if resource := "customresourcedefinitions"; apiResourceConfig.ResourceEnabled(v1.SchemeGroupVersion.WithResource(resource)) {
  16. // 实例化 REST 资源:customResourceDefinitionStorage
  17. customResourceDefinitionStorage, err := customresourcedefinition.NewREST(Scheme, c.GenericConfig.RESTOptionsGetter)
  18. if err != nil {
  19. return nil, err
  20. }
  21. storage[resource] = customResourceDefinitionStorage
  22. storage[resource+"/status"] = customresourcedefinition.NewStatusREST(Scheme, customResourceDefinitionStorage)
  23. }
  24. if len(storage) > 0 {
  25. // 建立 version 到 REST 资源的 mapping
  26. apiGroupInfo.VersionedResourcesStorageMap[v1.SchemeGroupVersion.Version] = storage
  27. }
  28. // 通过 apiGroupInfo 建立 REST API 到资源实例的路由
  29. if err := s.GenericAPIServer.InstallAPIGroup(&apiGroupInfo); err != nil {
  30. return nil, err
  31. }
  32. ...
  33. }

config.ApiExtensions.New 中定义的流程如注释所示。下面逐层展开各个流程。

1.2.1 创建通用 APIServer

进入 c.GenericConfig.New 查看通用 APIServer 创建过程。

  1. func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*GenericAPIServer, error) {
  2. // 创建通用 APIServer 的 handler
  3. apiServerHandler := NewAPIServerHandler(name, c.Serializer, handlerChainBuilder, delegationTarget.UnprotectedHandler())
  4. // 实例化通用 APIServer 并将前面创建的 apiServerHandler 赋给通用 APIServer
  5. s := &GenericAPIServer{
  6. Handler: apiServerHandler,
  7. listedPathProvider: apiServerHandler,
  8. }
  9. // 建立通用 REST 路由
  10. installAPI(s, c.Config)
  11. return s, nil
  12. }

首先,进入 NewAPIServerHandler 查看通用 APIServer handler 的创建。

  1. func NewAPIServerHandler(name string, s runtime.NegotiatedSerializer, handlerChainBuilder HandlerChainBuilderFn, notFoundHandler http.Handler) *APIServerHandler {
  2. // 建立非 REST 资源的路由
  3. nonGoRestfulMux := mux.NewPathRecorderMux(name)
  4. if notFoundHandler != nil {
  5. nonGoRestfulMux.NotFoundHandler(notFoundHandler)
  6. }
  7. // 创建 go-restful container 处理 REST 资源的路由
  8. gorestfulContainer := restful.NewContainer()
  9. gorestfulContainer.ServeMux = http.NewServeMux()
  10. gorestfulContainer.Router(restful.CurlyRouter{}) // e.g. for proxy/{kind}/{name}/{*}
  11. gorestfulContainer.RecoverHandler(func(panicReason interface{}, httpWriter http.ResponseWriter) {
  12. logStackOnRecover(s, panicReason, httpWriter)
  13. })
  14. gorestfulContainer.ServiceErrorHandler(func(serviceErr restful.ServiceError, request *restful.Request, response *restful.Response) {
  15. serviceErrorHandler(s, serviceErr, request, response)
  16. })
  17. // 将两种路由 REST 资源路由和 非 REST 资源路由赋给 director
  18. director := director{
  19. name: name,
  20. goRestfulContainer: gorestfulContainer,
  21. nonGoRestfulMux: nonGoRestfulMux,
  22. }
  23. // 返回 APIServerHandler
  24. return &APIServerHandler{
  25. FullHandlerChain: handlerChainBuilder(director),
  26. GoRestfulContainer: gorestfulContainer,
  27. NonGoRestfulMux: nonGoRestfulMux,
  28. Director: director,
  29. }
  30. }

kube-apiserver 基于 go-restful 框架建立 RESTful API 的路由。

创建好 apiServerHandler 后将该 handler 赋给通用 APIServer GenericAPIServer。接着进入 installAPI(s, c.Config) 看通用 REST 路由的创建过程。

  1. func installAPI(s *GenericAPIServer, c *Config) {
  2. ...
  3. routes.Version{Version: c.Version}.Install(s.Handler.GoRestfulContainer)
  4. }

这里看一种 REST 路由 version 的建立过程,进入 Version.Install

  1. // Install registers the APIServer's `/version` handler.
  2. func (v Version) Install(c *restful.Container) {
  3. if v.Version == nil {
  4. return
  5. }
  6. // Set up a service to return the git code version.
  7. versionWS := new(restful.WebService)
  8. versionWS.Path("/version")
  9. versionWS.Doc("git code version from which this is built")
  10. versionWS.Route(
  11. versionWS.GET("/").To(v.handleVersion).
  12. Doc("get the code version").
  13. Operation("getCodeVersion").
  14. Produces(restful.MIME_JSON).
  15. Consumes(restful.MIME_JSON).
  16. Writes(version.Info{}))
  17. c.Add(versionWS)
  18. }
  19. // handleVersion writes the server's version information.
  20. func (v Version) handleVersion(req *restful.Request, resp *restful.Response) {
  21. responsewriters.WriteRawJSON(http.StatusOK, *v.Version, resp.ResponseWriter)
  22. }

可以看到,在 Install 内建立了 /versionv.handleVersion 的路由。

下一节,将继续介绍创建服务链的流程。未完待续...


原文链接:https://www.cnblogs.com/xingzheanan/p/17787066.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号