经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 数据库/运维 » Kubernetes » 查看文章
Kubernetes:kube-apiserver 和 etcd 的交互
来源:cnblogs  作者:lubanseven  时间:2023/11/6 9:11:40  对本文有异议

kubernetes:kube-apiserver 系列文章:

0. 前言

上几篇文章介绍了 kubernetes 的核心数据结构 schemeKubernetes:kube-apiserver 的启动流程。在启动流程篇中重点关注的是启动的核心逻辑,并没有关注 kube-apiserver 和外部组件的交互。

而,交互是非常必要的,其定义了边界和依赖。

image

Kubernetes 架构图可以看出,kube-apiserver 是唯一和 etcd 交互的组件。因此,这里将 kube-apiserveretcd 交互的部分单独拿出来加以介绍,做到知其然,知其所以然。

1. 使用 etcd

既然是交互,首先需要了解的是怎么用交互的组件。这里同 kube-apiserver 交互的是大名鼎鼎的 etcd,不需要多介绍它。

仅给出示例:

  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "log"
  6. "time"
  7. "go.etcd.io/etcd/clientv3"
  8. )
  9. func main() {
  10. cli, err := clientv3.New(clientv3.Config{
  11. Endpoints: []string{"127.0.0.1:2379"},
  12. DialTimeout: time.Second * 5,
  13. })
  14. if err != nil {
  15. log.Fatal(err)
  16. }
  17. fmt.Println("connect to etcd success.")
  18. defer cli.Close()
  19. // lease with 5 second
  20. resp, err := cli.Grant(context.TODO(), 5)
  21. if err != nil {
  22. log.Fatal(err)
  23. }
  24. // delete key:name after expire of lease
  25. _, err = cli.Put(context.TODO(), "name", "hxia", clientv3.WithLease(resp.ID))
  26. if err != nil {
  27. log.Fatal(err)
  28. }
  29. }

详细内容可参考 go-by-example: etcdQuickstart

2. kube-apiserver 和 etcd

顺序看 kube-apiserveretcd 的交互是非常复杂的,容易头晕。这里,逆序的看 kube-apiserveretcd 的交互。首先,找到它们在哪里交互的,接着从这一点开始发散,摸清整体脉络。

那么,它们在哪里交互的呢?这个问题不难回答,在 handler。作为 RESTful API 的处理单元,handler 内定义了 kube-apiserveretcd 的交互。

以处理 GEThandler 为例:

  1. # kubernetes/vendor/k8s.io/apiserver/pkg/endpoints/installer.go
  2. func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storage, ws *restful.WebService) (*metav1.APIResource, *storageversion.ResourceInfo, error) {
  3. switch action.Verb {
  4. case "GET": // Get a resource.
  5. var handler restful.RouteFunction
  6. if isGetterWithOptions {
  7. handler = restfulGetResourceWithOptions(getterWithOptions, reqScope, isSubresource)
  8. } else {
  9. handler = restfulGetResource(getter, reqScope)
  10. }
  11. route := ws.GET(action.Path).To(handler).
  12. Doc(doc).
  13. Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
  14. Operation("read"+namespaced+kind+strings.Title(subresource)+operationSuffix).
  15. Produces(append(storageMeta.ProducesMIMETypes(action.Verb), mediaTypes...)...).
  16. Returns(http.StatusOK, "OK", producedObject).
  17. Writes(producedObject)
  18. addParams(route, action.Params)
  19. routes = append(routes, route)
  20. }
  21. }

进入 restfulGetResourcehandler 是怎么创建的。

  1. func restfulGetResource(r rest.Getter, scope handlers.RequestScope) restful.RouteFunction {
  2. return func(req *restful.Request, res *restful.Response) {
  3. handlers.GetResource(r, &scope)(res.ResponseWriter, req.Request)
  4. }
  5. }
  6. // GetResource returns a function that handles retrieving a single resource from a rest.Storage object.
  7. func GetResource(r rest.Getter, scope *RequestScope) http.HandlerFunc {
  8. return getResourceHandler(scope,
  9. func(ctx context.Context, name string, req *http.Request) (runtime.Object, error) {
  10. ...
  11. return r.Get(ctx, name, &options)
  12. })
  13. }
  14. // Getter is an object that can retrieve a named RESTful resource.
  15. type Getter interface {
  16. // Get finds a resource in the storage by name and returns it.
  17. // Although it can return an arbitrary error value, IsNotFound(err) is true for the
  18. // returned error value err when the specified resource is not found.
  19. Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error)
  20. }

可以看到:

  1. restfulGetResource 返回一个路由函数,路由函数内包含传递给 restfulGetResourcegetter 对象。
  2. 返回的路由函数内,调用的是 getterGet 方法获取资源对象 runtime.Object。这里的 getter 是实现 Getter 接口的对象。

基于上述分析,现在重点就变成 getter 调用的 Get 具体做了什么。通过逐级向上追溯,找到了 Getter 接口的实例对象 customResourceDefinitionStorage

  1. # kubernetes/vendor/k8s.io/apiextensions-apiserver/pkg/apiserver/apiserver.go
  2. func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*CustomResourceDefinitions, error) {
  3. if resource := "customresourcedefinitions"; apiResourceConfig.ResourceEnabled(v1.SchemeGroupVersion.WithResource(resource)) {
  4. // 调用 NetREST 创建资源实体 customResourceDefinitionStorage
  5. customResourceDefinitionStorage, err := customresourcedefinition.NewREST(Scheme, c.GenericConfig.RESTOptionsGetter)
  6. if err != nil {
  7. return nil, err
  8. }
  9. storage[resource] = customResourceDefinitionStorage
  10. storage[resource+"/status"] = customresourcedefinition.NewStatusREST(Scheme, customResourceDefinitionStorage)
  11. }
  12. }
  13. func NewREST(scheme *runtime.Scheme, optsGetter generic.RESTOptionsGetter) (*REST, error) {
  14. strategy := NewStrategy(scheme)
  15. store := &genericregistry.Store{
  16. ...
  17. }
  18. options := &generic.StoreOptions{RESTOptions: optsGetter, AttrFunc: GetAttrs}
  19. if err := store.CompleteWithOptions(options); err != nil {
  20. return nil, err
  21. }
  22. return &REST{store}, nil
  23. }

接口的实例对象找到了,继续看实例对象的 Get 做了什么。

  1. # kubernetes/vendor/k8s.io/apiextensions-apiserver/pkg/registry/customresourcedefinition/etcd.go
  2. // rest implements a RESTStorage for API services against etcd
  3. type REST struct {
  4. *genericregistry.Store
  5. }
  6. # kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry/store.go
  7. type Store struct {
  8. Storage DryRunnableStorage
  9. }
  10. # kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry/dryrun.go
  11. type DryRunnableStorage struct {
  12. Storage storage.Interface
  13. Codec runtime.Codec
  14. }
  15. # kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry/store.go
  16. // Get retrieves the item from storage.
  17. func (e *Store) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) {
  18. obj := e.NewFunc()
  19. key, err := e.KeyFunc(ctx, name)
  20. if err != nil {
  21. return nil, err
  22. }
  23. if err := e.Storage.Get(ctx, key, storage.GetOptions{ResourceVersion: options.ResourceVersion}, obj); err != nil {
  24. return nil, storeerr.InterpretGetError(err, e.qualifiedResourceFromContext(ctx), name)
  25. }
  26. if e.Decorator != nil {
  27. e.Decorator(obj)
  28. }
  29. return obj, nil
  30. }
  31. # kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry/dryrun.go
  32. func (s *DryRunnableStorage) Get(ctx context.Context, key string, opts storage.GetOptions, objPtr runtime.Object) error {
  33. return s.Storage.Get(ctx, key, opts, objPtr)
  34. }

REST 对象包含 *genericregistry.Store,其继承了 StoreGet 方法。在 Store.Get 方法内,通过 e.Storage.Get 调用 DryRunnableStorageGet 方法。实际是通过 DryRunnableStorage 内的 Storage 存储接口调用 Get 方法,从而访问 etcd

DryRunnableStorage.Storage 是一个接口,它的实体对象是什么呢?

还是从资源实体入手,看 REST{store} 是如何实例化的。

  1. // NewREST returns a RESTStorage object that will work against API services.
  2. func NewREST(scheme *runtime.Scheme, optsGetter generic.RESTOptionsGetter) (*REST, error) {
  3. strategy := NewStrategy(scheme)
  4. store := &genericregistry.Store{
  5. ...
  6. }
  7. options := &generic.StoreOptions{RESTOptions: optsGetter, AttrFunc: GetAttrs}
  8. // 进入 CompleteWithOptions
  9. if err := store.CompleteWithOptions(options); err != nil {
  10. return nil, err
  11. }
  12. return &REST{store}, nil
  13. }
  14. func (e *Store) CompleteWithOptions(options *generic.StoreOptions) error {
  15. if e.Storage.Storage == nil {
  16. e.Storage.Storage, e.DestroyFunc, err = opts.Decorator(
  17. opts.StorageConfig,
  18. prefix,
  19. keyFunc,
  20. e.NewFunc,
  21. e.NewListFunc,
  22. attrFunc,
  23. options.TriggerFunc,
  24. options.Indexers,
  25. )
  26. }
  27. }

看到这里,已经知道哪里实例化的 storage.Interface 对象了。这里的 opts.Decorator 是一个装饰函数。接着,继续探案,看这个装饰函数干了什么,知道它干了什么就能挖出来最关键的一环,存储接口是怎么访问 etcd 的。

  1. # kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry/store.go
  2. func (e *Store) CompleteWithOptions(options *generic.StoreOptions) error {
  3. // 通过 options.RESTOptions.GetRESTOptions 实例化 opts
  4. // options.RESTOptions 是满足 RESTOptionsGetter 接口的实例
  5. opts, err := options.RESTOptions.GetRESTOptions(e.DefaultQualifiedResource)
  6. if err != nil {
  7. return err
  8. }
  9. }
  10. # kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/options.go
  11. type RESTOptionsGetter interface {
  12. GetRESTOptions(resource schema.GroupResource) (RESTOptions, error)
  13. }
  14. func NewREST(scheme *runtime.Scheme, optsGetter generic.RESTOptionsGetter) (*REST, error) {
  15. strategy := NewStrategy(scheme)
  16. store := &genericregistry.Store{
  17. ...
  18. }
  19. // 创建 options
  20. options := &generic.StoreOptions{RESTOptions: optsGetter, AttrFunc: GetAttrs}
  21. // 将 options 作为参数传递给 Store.CompleteWithOptions
  22. if err := store.CompleteWithOptions(options); err != nil {
  23. return nil, err
  24. }
  25. return &REST{store}, nil
  26. }
  27. func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*CustomResourceDefinitions, error) {
  28. if resource := "customresourcedefinitions"; apiResourceConfig.ResourceEnabled(v1.SchemeGroupVersion.WithResource(resource)) {
  29. customResourceDefinitionStorage, err := customresourcedefinition.NewREST(Scheme, c.GenericConfig.RESTOptionsGetter)
  30. if err != nil {
  31. return nil, err
  32. }
  33. storage[resource] = customResourceDefinitionStorage
  34. storage[resource+"/status"] = customresourcedefinition.NewStatusREST(Scheme, customResourceDefinitionStorage)
  35. }
  36. }

可以看到,c.GenericConfig.RESTOptionsGetter 即为 optsGetter,调用 c.GenericConfig.RESTOptionsGetterGetRESTOptions 得到 RESTOptions

c.GenericConfig.RESTOptionsGetter 在哪里实例化的呢?

还记得前面创建通用配置的 BuildGenericConfig 吗?在该函数内,实例化了 c.GenericConfig.RESTOptionsGetter

  1. # kubernetes/pkg/controlplane/apiserver/config.go
  2. func BuildGenericConfig(
  3. s controlplaneapiserver.CompletedOptions,
  4. schemes []*runtime.Scheme,
  5. getOpenAPIDefinitions func(ref openapicommon.ReferenceCallback) map[string]openapicommon.OpenAPIDefinition,
  6. ){
  7. storageFactoryConfig := kubeapiserver.NewStorageFactoryConfig()
  8. storageFactoryConfig.APIResourceConfig = genericConfig.MergedResourceConfig
  9. storageFactory, lastErr = storageFactoryConfig.Complete(s.Etcd).New()
  10. if lastErr != nil {
  11. return
  12. }
  13. if lastErr = s.Etcd.ApplyWithStorageFactoryTo(storageFactory, genericConfig); lastErr != nil {
  14. return
  15. }
  16. }
  17. func (s *EtcdOptions) ApplyWithStorageFactoryTo(factory serverstorage.StorageFactory, c *server.Config) error {
  18. c.RESTOptionsGetter = s.CreateRESTOptionsGetter(factory, c.ResourceTransformers)
  19. return nil
  20. }
  21. func (s *EtcdOptions) CreateRESTOptionsGetter(factory serverstorage.StorageFactory, resourceTransformers storagevalue.ResourceTransformers) generic.RESTOptionsGetter {
  22. if resourceTransformers != nil {
  23. factory = &transformerStorageFactory{
  24. delegate: factory,
  25. resourceTransformers: resourceTransformers,
  26. }
  27. }
  28. return &StorageFactoryRestOptionsFactory{Options: *s, StorageFactory: factory}
  29. }

过程也不复杂,可以看到,RESTOptionsGetter 接口的实例化对象是 &StorageFactoryRestOptionsFactory

调用 c.GenericConfig.RESTOptionsGetterGetRESTOptions 实际调用的是 StorageFactoryRestOptionsFactory.GetRESTOptions

  1. func (f *StorageFactoryRestOptionsFactory) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) {
  2. ret := generic.RESTOptions{
  3. StorageConfig: storageConfig,
  4. Decorator: generic.UndecoratedStorage,
  5. DeleteCollectionWorkers: f.Options.DeleteCollectionWorkers,
  6. EnableGarbageCollection: f.Options.EnableGarbageCollection,
  7. ResourcePrefix: f.StorageFactory.ResourcePrefix(resource),
  8. CountMetricPollPeriod: f.Options.StorageConfig.CountMetricPollPeriod,
  9. StorageObjectCountTracker: f.Options.StorageConfig.StorageObjectCountTracker,
  10. }
  11. return ret, nil
  12. }

RESTOptions 中包含了 Decorator 的创建,这里我们的重点是 Decorator,进入 generic.UndecoratedStorage 看它是怎么一个函数。

  1. # kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/storage_decorator.go
  2. func UndecoratedStorage(
  3. config *storagebackend.ConfigForResource,
  4. resourcePrefix string,
  5. keyFunc func(obj runtime.Object) (string, error),
  6. newFunc func() runtime.Object,
  7. newListFunc func() runtime.Object,
  8. getAttrsFunc storage.AttrFunc,
  9. trigger storage.IndexerFuncs,
  10. indexers *cache.Indexers) (storage.Interface, factory.DestroyFunc, error) {
  11. return NewRawStorage(config, newFunc, newListFunc, resourcePrefix)
  12. }
  13. func NewRawStorage(config *storagebackend.ConfigForResource, newFunc, newListFunc func() runtime.Object, resourcePrefix string) (storage.Interface, factory.DestroyFunc, error) {
  14. return factory.Create(*config, newFunc, newListFunc, resourcePrefix)
  15. }
  16. # kubernetes/vendor/k8s.io/apiserver/pkg/storage/storagebackend/factory/factory.go
  17. func Create(c storagebackend.ConfigForResource, newFunc, newListFunc func() runtime.Object, resourcePrefix string) (storage.Interface, DestroyFunc, error) {
  18. switch c.Type {
  19. case storagebackend.StorageTypeETCD2:
  20. return nil, nil, fmt.Errorf("%s is no longer a supported storage backend", c.Type)
  21. case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3:
  22. return newETCD3Storage(c, newFunc, newListFunc, resourcePrefix)
  23. default:
  24. return nil, nil, fmt.Errorf("unknown storage type: %s", c.Type)
  25. }
  26. }
  27. # kubernetes/vendor/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go
  28. func newETCD3Storage(c storagebackend.ConfigForResource, newFunc, newListFunc func() runtime.Object, resourcePrefix string) (storage.Interface, DestroyFunc, error) {
  29. client, err := newETCD3Client(c.Transport)
  30. if err != nil {
  31. stopCompactor()
  32. return nil, nil, err
  33. }
  34. client.KV = etcd3.NewETCDLatencyTracker(client.KV)
  35. return etcd3.New(client, c.Codec, newFunc, newListFunc, c.Prefix, resourcePrefix, c.GroupResource, transformer, c.Paging, c.LeaseManagerConfig), destroyFunc, nil
  36. }
  37. // New returns an etcd3 implementation of storage.Interface.
  38. func New(c *clientv3.Client, codec runtime.Codec, newFunc, newListFunc func() runtime.Object, prefix, resourcePrefix string, groupResource schema.GroupResource, transformer value.Transformer, pagingEnabled bool, leaseManagerConfig LeaseManagerConfig) storage.Interface {
  39. return newStore(c, codec, newFunc, newListFunc, prefix, resourcePrefix, groupResource, transformer, pagingEnabled, leaseManagerConfig)
  40. }
  41. func newStore(c *clientv3.Client, codec runtime.Codec, newFunc, newListFunc func() runtime.Object, prefix, resourcePrefix string, groupResource schema.GroupResource, transformer value.Transformer, pagingEnabled bool, leaseManagerConfig LeaseManagerConfig) *store {
  42. s := &store{
  43. client: c,
  44. codec: codec,
  45. versioner: versioner,
  46. transformer: transformer,
  47. pagingEnabled: pagingEnabled,
  48. pathPrefix: pathPrefix,
  49. groupResource: groupResource,
  50. groupResourceString: groupResource.String(),
  51. watcher: w,
  52. leaseManager: newDefaultLeaseManager(c, leaseManagerConfig),
  53. }
  54. return s
  55. }

上述代码基本都是函数的顺序调用,不用介绍太多。
可以看到 opts.Decorator 做的事情是实例化了一个访问 etcd 的接口实例 storestore 中存储了访问 etcdclientclient 是通过 newETCD3Client(c.Transport) 创建的。

到这里,基本破案了。访问 etcd 实际是通过 storeetcd 进行交互。这里的 store 只是存储了 client 并没有实际访问,实际访问在 handler

再回头看 DryRunnableStorage.Get 方法内的 s.Storage.Get 即可知道其调用的是 storeGet 方法。

  1. # kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry/dryrun.go
  2. func (s *DryRunnableStorage) Get(ctx context.Context, key string, opts storage.GetOptions, objPtr runtime.Object) error {
  3. return s.Storage.Get(ctx, key, opts, objPtr)
  4. }
  5. # kubernetes/vendor/k8s.io/apiserver/pkg/storage/etcd3/store.go
  6. // Get implements storage.Interface.Get.
  7. func (s *store) Get(ctx context.Context, key string, opts storage.GetOptions, out runtime.Object) error {
  8. preparedKey, err := s.prepareKey(key)
  9. if err != nil {
  10. return err
  11. }
  12. startTime := time.Now()
  13. // 通过 client 访问 key 对应的 value
  14. getResp, err := s.client.KV.Get(ctx, preparedKey)
  15. kv := getResp.Kvs[0]
  16. data, _, err := s.transformer.TransformFromStorage(ctx, kv.Value, authenticatedDataString(preparedKey))
  17. if err != nil {
  18. return storage.NewInternalError(err.Error())
  19. }
  20. err = decode(s.codec, s.versioner, data, out, kv.ModRevision)
  21. if err != nil {
  22. recordDecodeError(s.groupResourceString, preparedKey)
  23. return err
  24. }
  25. return nil
  26. }

最后,通过本文介绍了 kube-apiserveretcd 的交互。下一步将重点介绍 kube-apiserver 是怎么做鉴权,认证和准入机制的。


原文链接:https://www.cnblogs.com/xingzheanan/p/17810847.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号