1. 概述
本节标题写的是 Informer,不过我们的内容不局限于狭义的 Informer 部分,只是 Informer 最有代表性,其他的 Reflector 等也不好独立开来讲。
Informer 在很多组件的源码中可以看到,尤其是 kube-controller-manager (写这篇文章时我已经基本写完 kube-scheduler 的源码分析,着手写 kube-controller-manager 了,鉴于 controlelr 和 client-go 关联比较大,跳过来先讲讲典型的控制器工作流程中涉及到的 client-go 部分).
Informer 是 client-go 中一个比较核心的工具,通过 Informer(实际我们用到的都不是单纯的 informer,而是组合了各种工具的 sharedInformerFactory) 我们可以轻松 List/Get 某个资源对象,可以监听资源对象的各种事件(比如创建和删除)然后触发回调函数,让我们能够在各种事件发生的时候能够作出相应的逻辑处理。举个例字,当 pod 数量变化的时候 deployment 是不是需要判断自己名下的 pod 数量是否还和预期的一样?如果少了是不是要考虑创建?
2. 架构概览
自定义控制器的工作流程基本如下图所示,我们今天要分析图中上半部分的逻辑。

我们开发自定义控制器的时候用到的“机制”主要定义在 client-go 的 tool/cache下:

我们根据图中的9个步骤来跟源码
3. reflector - List & Watch API Server
Reflector 会监视特定的资源,将变化写入给定的存储中,也就是 Delta FIFO queue.
3.1. Reflector 对象
Reflector 的中文含义是反射器,我们先看一下类型定义:
tools/cache/reflector.go:47
type Reflector struct {
name string
metrics *reflectorMetrics
expectedType reflect.Type
store Store
listerWatcher ListerWatcher
period time.Duration
resyncPeriod time.Duration
ShouldResync func() bool
clock clock.Clock
lastSyncResourceVersion string
lastSyncResourceVersionMutex sync.RWMutex
}
reflector.go
中主要就 Reflector 这个 struct 和相关的一些函数:

3.2. ListAndWatch
ListAndWatch 首先 list 所有 items,获取当前的资源版本信息,然后使用这个版本信息来 watch(也就是从这个版本开始的所有资源变化会被关注)。我们看一下这里的 ListAndWatch 方法主要逻辑:
tools/cache/reflector.go:168
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
4. watchHandler - add obj to delta fifo
前面讲到 ListAndWatch 函数的最后一步逻辑是 watchHandler,在 ListAndWatch 中先是更新了 Delta FIFO 中的 item,然后 watch 资源对象,最后交给 watchHandler 处理,所以 watchHandler 基本可以猜到是将有变化的资源添加到 Delta FIFO 中了。
tools/cache/reflector.go:287
func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
5.1. Controller
一个 Informer 需要实现 Controller 接口:
tools/cache/controller.go:82
type Controller interface {
Run(stopCh <-chan struct{})
HasSynced() bool
LastSyncResourceVersion() string
}
一个基础的 Controller 实现如下:
tools/cache/controller.go:75
type controller struct {
config Config
reflector *Reflector
reflectorMutex sync.RWMutex
clock clock.Clock
}
controller 类型结构如下:

可以看到主要对外暴露的逻辑是 Run() 方法,我们看一下 Run() 中的逻辑:
tools/cache/controller.go:100
func (c *controller) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
go func() {
<-stopCh
c.config.Queue.Close()
}()
5.2. processLoop
tools/cache/controller.go:148
func (c *controller) processLoop() {
for {
这里的 Queue 就是 Delta FIFO,Pop 是个阻塞方法,内部实现时会逐个 pop 队列中的数据,交给 PopProcessFunc 处理。我们先不看 Pop 的实现,关注一下 PopProcessFunc 是如何处理 Pop 中从队列拿出来的 item 的。
PopProcessFunc 是一个类型:
type PopProcessFunc func(interface{}) error
所以这里只是一个类型转换,我们关注c.config.Process
就行:
tools/cache/controller.go:367
Process: func(obj interface{}) error {
for _, d := range obj.(Deltas) {
switch d.Type {
这里涉及到2个点:
- clientState
- ResourceEventHandler (h)
我们一一来看
6. Add obj to Indexer (Thread safe store)
前面说到 clientState,这个变量的初始化是clientState := NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers)
NewIndexer 代码如下:
func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer {
return &cache{
cacheStorage: NewThreadSafeStore(indexers, Indices{}),
keyFunc: keyFunc,
}
}
type Indexer interface {
Store
Index(indexName string, obj interface{}) ([]interface{}, error)
IndexKeys(indexName, indexKey string) ([]string, error)
ListIndexFuncValues(indexName string) []string
ByIndex(indexName, indexKey string) ([]interface{}, error)
GetIndexers() Indexers
AddIndexers(newIndexers Indexers) error
}
顺带看一下 NewThreadSafeStore()
tools/cache/thread_safe_store.go:298
func NewThreadSafeStore(indexers Indexers, indices Indices) ThreadSafeStore {
return &threadSafeMap{
items: map[string]interface{}{},
indexers: indexers,
indices: indices,
}
}
然后关注一下 Process 中的err := clientState.Add(d.Object)
的 Add() 方法:
func (c *cache) Add(obj interface{}) error {
cacheStorage 是一个 ThreadSafeStore 实例,这个 Add() 代码如下:
tools/cache/thread_safe_store.go:68
func (c *threadSafeMap) Add(key string, obj interface{}) {
c.lock.Lock()
defer c.lock.Unlock()
第四步和第五步的内容先分析到这里,后面关注 threadSafeMap 实现的时候再继续深入。
第六步是 Dispatch Event Handler functions(Send Object to Custom Controller)
我们先看一个接口 SharedInformer:
tools/cache/shared_informer.go:43
type SharedInformer interface {
AddEventHandler(handler ResourceEventHandler)
AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration)
GetStore() Store
GetController() Controller
Run(stopCh <-chan struct{})
HasSynced() bool
LastSyncResourceVersion() string
}
SharedInformer 有一个共享的 data cache,能够分发 changes 通知到缓存,到通过 AddEventHandler 注册了的 listerners. 当你接收到一个通知,缓存的内容能够保证至少和通知中的一样新。
再看一下 SharedIndexInformer 接口:
tools/cache/shared_informer.go:66
type SharedIndexInformer interface {
SharedInformer
相比 SharedInformer 增加了一个 Indexer. 然后看具体的实现 sharedIndexInformer 吧:
tools/cache/shared_informer.go:127
type sharedIndexInformer struct {
indexer Indexer
controller Controller
processor *sharedProcessor
cacheMutationDetector CacheMutationDetector
listerWatcher ListerWatcher
objectType runtime.Object
resyncCheckPeriod time.Duration
defaultEventHandlerResyncPeriod time.Duration
clock clock.Clock
started, stopped bool
startedLock sync.Mutex
blockDeltas sync.Mutex
}
这个类型内包了很多我们前面看到过的对象,indexer、controller、listeratcher 都不陌生,我们看这里的 processor 是做什么的:
7.1. sharedProcessor
类型定义如下:
tools/cache/shared_informer.go:375
type sharedProcessor struct {
listenersStarted bool
listenersLock sync.RWMutex
listeners []*processorListener
syncingListeners []*processorListener
clock clock.Clock
wg wait.Group
}
这里的重点明显是 listeners 属性了,我们继续看 listeners 的类型中的 processorListener:
7.1.1. processorListener
tools/cache/shared_informer.go:466
type processorListener struct {
nextCh chan interface{}
addCh chan interface{}
handler ResourceEventHandler
processorListener 主要有2个方法:
7.1.2. processorListener.run()
先看一下这个 run 做了什么:
tools/cache/shared_informer.go:540
func (p *processorListener) run() {
stopCh := make(chan struct{})
wait.Until(func() {
这个 run 过程不复杂,等待信号然后调用 handler 的增删改方法做对应的处理逻辑。case 里的 Notification 再看一眼:
tools/cache/shared_informer.go:176
type updateNotification struct {
oldObj interface{}
newObj interface{}
}
type addNotification struct {
newObj interface{}
}
type deleteNotification struct {
oldObj interface{}
}
另外注意到for next := range p.nextCh
是下面的 case 执行的前提,也就是说触发点是 p.nextCh,我们接着看 pop 过程(这里的逻辑不简单,可能得多花点精力)
7.1.3. processorListener.pop()
tools/cache/shared_informer.go:510
func (p *processorListener) pop() {
defer utilruntime.HandleCrash()
defer close(p.nextCh)
这里的 pop 逻辑的入口是<-p.addCh
,我们继续向上找一下这个 addCh 的来源:
7.1.4. processorListener.add()
tools/cache/shared_informer.go:506
func (p *processorListener) add(notification interface{}) {
p.addCh <- notification
}
这个 add() 方法又在哪里被调用呢?
7.1.5. sharedProcessor.distribute()
tools/cache/shared_informer.go:400
func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
if sync {
for _, listener := range p.syncingListeners {
listener.add(obj)
}
} else {
for _, listener := range p.listeners {
listener.add(obj)
}
}
}
这个方法逻辑比较简洁,分发对象。我们继续看哪里进入的 distribute:
tools/cache/shared_informer.go:344
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()
继续往前看代码逻辑。
tools/cache/shared_informer.go:189
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
看到这里已经挺和谐了,在 sharedIndexInformer 的 Run() 方法中先是创建一个 DeltaFIFO,然后和 lw 一起初始化 cfg,利用 cfg 创建 controller,最后 Run 这个 controller,也就是最基础的 informer.
在这段代码里我们还注意到有一步是s.processor.run
,我们看一下这个 run 的逻辑。
7.3.1. sharedProcessor.run()
tools/cache/shared_informer.go:415
func (p *sharedProcessor) run(stopCh <-chan struct{}) {
func() {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
for _, listener := range p.listeners {
撇开细节,可以看到这里调用了内部所有 listener 的 run() 和 pop() 方法,和前面的分析呼应上了。
到这里,我们基本讲完了自定义 controller 的时候 client-go 里相关的逻辑,也就是图中的上半部分:

