- Pod priority
- preempt 入口
- preempt 实现
- SchedulingQueue
- PodPreemptor
- xx.Algorithm.Preempt
- 接口定义
- 整体流程
- podEligibleToPreemptOthers
- nodesWherePreemptionMightHelp
- selectNodesForPreemption
- pickOneNodeForPreemption
- 小结
1. Pod priority
Pod 有了 priority(优先级) 后才有优先级调度、抢占调度的说法,高优先级的 pod 可以在调度队列中排到前面,优先选择 node;另外当高优先级的 pod 找不到合适的 node 时,就会看 node 上低优先级的 pod 驱逐之后是否能够 run 起来,如果可以,那么 node 上的一个或多个低优先级的 pod 会被驱逐,然后高优先级的 pod 得以成功运行1个 node 上。
今天我们分析 pod 抢占相关的代码。开始之前我们看一下和 priority 相关的2个示例配置文件:
PriorityClass 例子
apiVersion: scheduling.k8s.io/v1
kind: PriorityClass
metadata:
name: high-priority
value: 1000000
globalDefault: false
description: "This priority class should be used for XYZ service pods only."
使用上述 PriorityClass
apiVersion: v1
kind: Pod
metadata:
name: nginx
labels:
env: test
spec:
containers:
- name: nginx
image: nginx
imagePullPolicy: IfNotPresent
priorityClassName: high-priority
这两个文件的内容这里不解释,Pod priority 相关知识点不熟悉的小伙伴请先查阅官方文档,我们下面看调度器中和 preempt 相关的代码逻辑。
2. preempt 入口
在pkg/scheduler/scheduler.go:513 scheduleOne()
方法中我们上一次关注的是suggestedHost, err := sched.schedule(pod)
这行代码,也就是关注通常情况下调度器如何给一个 pod 匹配一个最合适的 node. 今天我们来看如果这一行代码返回的 err != nil
情况下,如何开始 preempt 过程。
pkg/scheduler/scheduler.go:529
suggestedHost, err := sched.schedule(pod)
if err != nil {
if fitError, ok := err.(*core.FitError); ok {
preemptionStartTime := time.Now()
sched.preempt(pod, fitError)
metrics.PreemptionAttempts.Inc()
} else {
klog.Errorf("error selecting node for pod: %v", err)
metrics.PodScheduleErrors.Inc()
}
return
}
当schedule()
函数没有返回 host,也就是没有找到合适的 node 的时候,就会出发 preempt 过程。这时候代码逻辑进入sched.preempt(pod, fitError)
这一行。我们先看一下这个函数的整体逻辑,然后深入其中涉及的子过程:
pkg/scheduler/scheduler.go:311
func (sched *Scheduler) preempt(preemptor *v1.Pod, scheduleErr error) (string, error) {
if !util.PodPriorityEnabled() || sched.config.DisablePreemption {
return "", nil
}
preemptor, err := sched.config.PodPreemptor.GetUpdatedPod(preemptor)
node, victims, nominatedPodsToClear, err := sched.config.Algorithm.Preempt(preemptor, sched.config.NodeLister, scheduleErr)
var nodeName = ""
if node != nil {
nodeName = node.Name
sched.config.SchedulingQueue.UpdateNominatedPodForNode(preemptor, nodeName)
err = sched.config.PodPreemptor.SetNominatedNodeName(preemptor, nodeName)
if err != nil {
sched.config.SchedulingQueue.DeleteNominatedPodIfExists(preemptor)
return "", err
}
for _, victim := range victims {
if err := sched.config.PodPreemptor.DeletePod(victim); err != nil {
return "", err
}
sched.config.Recorder.Eventf(victim, v1.EventTypeNormal, "Preempted", "by %v/%v on node %v", preemptor.Namespace, preemptor.Name, nodeName)
}
}
for _, p := range nominatedPodsToClear {
rErr := sched.config.PodPreemptor.RemoveNominatedNodeName(p)
if rErr != nil {
klog.Errorf("Cannot remove nominated node annotation of pod: %v", rErr)
}
}
return nodeName, err
}
3. preempt 实现
上面 preempt()
函数中涉及到了一些值得深入看看的对象,下面我们逐个看一下这些对象的实现。
3.1. SchedulingQueue
SchedulingQueue 表示的是一个存储待调度 pod 的队列
pkg/scheduler/internal/queue/scheduling_queue.go:60
type SchedulingQueue interface {
Add(pod *v1.Pod) error
AddIfNotPresent(pod *v1.Pod) error
AddUnschedulableIfNotPresent(pod *v1.Pod) error
Pop() (*v1.Pod, error)
Update(oldPod, newPod *v1.Pod) error
Delete(pod *v1.Pod) error
MoveAllToActiveQueue()
AssignedPodAdded(pod *v1.Pod)
AssignedPodUpdated(pod *v1.Pod)
NominatedPodsForNode(nodeName string) []*v1.Pod
WaitingPods() []*v1.Pod
Close()
UpdateNominatedPodForNode(pod *v1.Pod, nodeName string)
DeleteNominatedPodIfExists(pod *v1.Pod)
NumUnschedulablePods() int
}
在 Scheduler 中 SchedulingQueue 接口对应两种实现:
- FIFO 先进先出队列
- PriorityQueue 优先级队列
3.1.1. FIFO
FIFO 结构是对 cache.FIFO 的简单包装,然后实现了 SchedulingQueue 接口。
pkg/scheduler/internal/queue/scheduling_queue.go:97
type FIFO struct {
*cache.FIFO
}
cache.FIFO定义在vendor/k8s.io/client-go/tools/cache/fifo.go:93
,这个先进先出队列的细节先不讨论。
3.1.2. PriorityQueue
PriorityQueue 同样实现了 SchedulingQueue 接口,PriorityQueue 的顶是最高优先级的 pending pod. 这里的PriorityQueue 有2个子 queue,activeQ 放的是等待调度的 pod,unschedulableQ 放的是已经尝试过调度,然后失败了,被标记为 unschedulable 的 pod.
我们看一下 PriorityQueue 结构的定义:
pkg/scheduler/internal/queue/scheduling_queue.go:201
type PriorityQueue struct {
stop <-chan struct{}
clock util.Clock
lock sync.RWMutex
cond sync.Cond
activeQ *Heap
unschedulableQ *UnschedulablePodsMap
nominatedPods *nominatedPodMap
receivedMoveRequest bool
closed bool
}
PriorityQueue 的方法比较好理解,我们看几个吧:
1、func (p *PriorityQueue) Add(pod *v1.Pod) error
//在 active queue 中添加1个pod
pkg/scheduler/internal/queue/scheduling_queue.go:276
func (p *PriorityQueue) Add(pod *v1.Pod) error {
p.lock.Lock()
defer p.lock.Unlock()
err := p.activeQ.Add(pod)
if err != nil {
klog.Errorf("Error adding pod %v/%v to the scheduling queue: %v", pod.Namespace, pod.Name, err)
} else {
if p.unschedulableQ.get(pod) != nil {
klog.Errorf("Error: pod %v/%v is already in the unschedulable queue.", pod.Namespace, pod.Name)
p.unschedulableQ.delete(pod)
}
p.nominatedPods.add(pod, "")
p.cond.Broadcast()
}
return err
}
2、func (p *PriorityQueue) AddIfNotPresent(pod *v1.Pod) error
//如果2个队列中都不存在该 pod,那么就添加到 active queue 中
pkg/scheduler/internal/queue/scheduling_queue.go:295
func (p *PriorityQueue) AddIfNotPresent(pod *v1.Pod) error {
p.lock.Lock()
defer p.lock.Unlock()
if p.unschedulableQ.get(pod) != nil {
return nil
}
if _, exists, _ := p.activeQ.Get(pod); exists {
return nil
}
err := p.activeQ.Add(pod)
if err != nil {
klog.Errorf("Error adding pod %v/%v to the scheduling queue: %v", pod.Namespace, pod.Name, err)
} else {
p.nominatedPods.add(pod, "")
p.cond.Broadcast()
}
return err
}
3、func (p *PriorityQueue) flushUnschedulableQLeftover()
//刷新 unschedulableQ 中的 pod,如果一个 pod 的呆的时间超过了 durationStayUnschedulableQ,就移动到 activeQ 中
pkg/scheduler/internal/queue/scheduling_queue.go:346
func (p *PriorityQueue) flushUnschedulableQLeftover() {
p.lock.Lock()
defer p.lock.Unlock()
var podsToMove []*v1.Pod
currentTime := p.clock.Now()
for _, pod := range p.unschedulableQ.pods {
lastScheduleTime := podTimestamp(pod)
if !lastScheduleTime.IsZero() && currentTime.Sub(lastScheduleTime.Time) > unschedulableQTimeInterval {
podsToMove = append(podsToMove, pod)
}
}
if len(podsToMove) > 0 {
p.movePodsToActiveQueue(podsToMove)
}
}
4、func (p *PriorityQueue) Pop() (*v1.Pod, error)
//从 activeQ 中 pop 一个 pod
pkg/scheduler/internal/queue/scheduling_queue.go:367
func (p *PriorityQueue) Pop() (*v1.Pod, error) {
p.lock.Lock()
defer p.lock.Unlock()
for len(p.activeQ.data.queue) == 0 {
if p.closed {
return nil, fmt.Errorf(queueClosed)
}
p.cond.Wait()
}
obj, err := p.activeQ.Pop()
if err != nil {
return nil, err
}
pod := obj.(*v1.Pod)
p.receivedMoveRequest = false
return pod, err
}
再看个别 PriorityQueue.nominatedPods
属性相关操作的方法,也就是 preempt()
函数中多次调用到的方法:
5、`func (p PriorityQueue) UpdateNominatedPodForNode(pod v1.Pod, nodeName string)`//pod 抢占的时候,确定一个 node 可以用于跑这个 pod 时,通过调用这个方法将 pod nominated 到 指定的 node 上。
pkg/scheduler/internal/queue/scheduling_queue.go:567
func (p *PriorityQueue) UpdateNominatedPodForNode(pod *v1.Pod, nodeName string) {
p.lock.Lock()
p.nominatedPods.add(pod, nodeName)
p.lock.Unlock()
}
先看 nominatedPods 属性的类型,这个类型用于存储 pods 被 nominate 到 nodes 的信息:
pkg/scheduler/internal/queue/scheduling_queue.go:822
type nominatedPodMap struct {
nominatedPods map[string][]*v1.Pod
nominatedPodToNode map[ktypes.UID]string
}
在看一下add()
方法的实现:
pkg/scheduler/internal/queue/scheduling_queue.go:832
func (npm *nominatedPodMap) add(p *v1.Pod, nodeName string) {
npm.delete(p)
nnn := nodeName
if len(nnn) == 0 {
nnn = NominatedNodeName(p)
if len(nnn) == 0 {
return
}
}
npm.nominatedPodToNode[p.UID] = nnn
for _, np := range npm.nominatedPods[nnn] {
if np.UID == p.UID {
klog.V(4).Infof("Pod %v/%v already exists in the nominated map!", p.Namespace, p.Name)
return
}
}
npm.nominatedPods[nnn] = append(npm.nominatedPods[nnn], p)
}
3.2. PodPreemptor
PodPreemptor 用来驱逐 pods 和更新 pod annotations.
pkg/scheduler/factory/factory.go:145
type PodPreemptor interface {
GetUpdatedPod(pod *v1.Pod) (*v1.Pod, error)
DeletePod(pod *v1.Pod) error
SetNominatedNodeName(pod *v1.Pod, nominatedNode string) error
RemoveNominatedNodeName(pod *v1.Pod) error
}
这个 interface 对应的实现类型是:
pkg/scheduler/factory/factory.go:1620
type podPreemptor struct {
Client clientset.Interface
}
这个类型绑定了4个方法:
pkg/scheduler/factory/factory.go:1624
func (p *podPreemptor) GetUpdatedPod(pod *v1.Pod) (*v1.Pod, error) {
return p.Client.CoreV1().Pods(pod.Namespace).Get(pod.Name, metav1.GetOptions{})
}
func (p *podPreemptor) DeletePod(pod *v1.Pod) error {
return p.Client.CoreV1().Pods(pod.Namespace).Delete(pod.Name, &metav1.DeleteOptions{})
}
func (p *podPreemptor) SetNominatedNodeName(pod *v1.Pod, nominatedNodeName string) error {
podCopy := pod.DeepCopy()
podCopy.Status.NominatedNodeName = nominatedNodeName
_, err := p.Client.CoreV1().Pods(pod.Namespace).UpdateStatus(podCopy)
return err
}
func (p *podPreemptor) RemoveNominatedNodeName(pod *v1.Pod) error {
if len(pod.Status.NominatedNodeName) == 0 {
return nil
}
return p.SetNominatedNodeName(pod, "")
}
3.3. xx.Algorithm.Preempt
3.3.1. 接口定义
我们回到挺久之前讲常规调度过程的时候提过的一个接口:
pkg/scheduler/algorithm/scheduler_interface.go:78
type ScheduleAlgorithm interface {
Schedule(*v1.Pod, NodeLister) (selectedMachine string, err error)
Preempt(*v1.Pod, NodeLister, error) (selectedNode *v1.Node, preemptedPods []*v1.Pod, cleanupNominatedPods []*v1.Pod, err error)
Predicates() map[string]FitPredicate
Prioritizers() []PriorityConfig
}
这个接口上次我们讲到的时候关注了Schedule()
、Predicates()
和Prioritizers()
,这次来看Preempt()
是怎么实现的。
3.3.2. 整体流程
Preempt()
同样由genericScheduler
类型(pkg/scheduler/core/generic_scheduler.go:98
)实现,方法前的一大串英文注释先来理解一下:
- Preempt 寻找一个在发生抢占之后能够成功调度“pod”的node.
- Preempt 选择一个 node 然后抢占上面的 pods 资源,返回:
- 这个 node 信息
- 被抢占的 pods 信息
- nominated node name 需要被清理的 node 列表
- 可能有的 error
- Preempt 过程不涉及快照更新(快照的逻辑以后再讲)
- 避免出现这种情况:preempt 发现一个不需要驱逐任何 pods 就能够跑“pod”的 node.
- 当有很多 pending pods 在调度队列中的时候,a nominated pod 会排到队列中相同优先级的 pod 后面.
- The nominated pod 会阻止其他 pods 使用“指定”的资源,哪怕花费了很多时间来等待其他 pending 的 pod.
我们先过整体流程,然后逐个分析子流程调用:
pkg/scheduler/core/generic_scheduler.go:251
func (g *genericScheduler) Preempt(pod *v1.Pod, nodeLister algorithm.NodeLister, scheduleErr error) (*v1.Node, []*v1.Pod, []*v1.Pod, error) {
if !podEligibleToPreemptOthers(pod, g.cachedNodeInfoMap) {
klog.V(5).Infof("Pod %v/%v is not eligible for more preemption.", pod.Namespace, pod.Name)
return nil, nil, nil, nil
}
allNodes, err := nodeLister.List()
if err != nil {
return nil, nil, nil, err
}
if len(allNodes) == 0 {
return nil, nil, nil, ErrNoNodesAvailable
}
potentialNodes := nodesWherePreemptionMightHelp(allNodes, fitError.FailedPredicates)
if len(potentialNodes) == 0 {
klog.V(3).Infof("Preemption will not help schedule pod %v/%v on any node.", pod.Namespace, pod.Name)
return nil, nil, []*v1.Pod{pod}, nil
}
pdbs, err := g.pdbLister.List(labels.Everything())
if err != nil {
return nil, nil, nil, err
}
nodeToVictims, err := selectNodesForPreemption(pod, g.cachedNodeInfoMap, potentialNodes, g.predicates,
g.predicateMetaProducer, g.schedulingQueue, pdbs)
if err != nil {
return nil, nil, nil, err
}
nodeToVictims, err = g.processPreemptionWithExtenders(pod, nodeToVictims)
if err != nil {
return nil, nil, nil, err
}
candidateNode := pickOneNodeForPreemption(nodeToVictims)
if candidateNode == nil {
return nil, nil, nil, err
}
nominatedPods := g.getLowerPriorityNominatedPods(pod, candidateNode.Name)
if nodeInfo, ok := g.cachedNodeInfoMap[candidateNode.Name]; ok {
return nodeInfo.Node(), nodeToVictims[candidateNode].Pods, nominatedPods, err
}
return nil, nil, nil, fmt.Errorf(
"preemption failed: the target node %s has been deleted from scheduler cache",
candidateNode.Name)
}
上面涉及到一些子过程调用,我们逐个来看~
podEligibleToPreemptOthers()
// 如何判断是否适合抢占?
nodesWherePreemptionMightHelp()
// 怎么寻找能够用于 preempt 的 nodes?
selectNodesForPreemption()
// 这个过程计算的是什么?
pickOneNodeForPreemption()
// 怎么从选择最合适被抢占的 node?
3.3.3. podEligibleToPreemptOthers
podEligibleToPreemptOthers
做的事情是判断一个 pod 是否应该去抢占其他 pods. 如果这个 pod 已经抢占过其他 pods,那些 pods 还在 graceful termination period 中,那就不应该再次发生抢占。
- 如果一个 node 已经被这个 pod nominated,并且这个 node 上有处于 terminating 状态的 pods,那么就不考虑驱逐更多的 pods.
这个函数逻辑很简单,我们直接看源码:
pkg/scheduler/core/generic_scheduler.go:1110
func podEligibleToPreemptOthers(pod *v1.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo) bool {
nomNodeName := pod.Status.NominatedNodeName
if len(nomNodeName) > 0 {
if nodeInfo, found := nodeNameToInfo[nomNodeName]; found {
for _, p := range nodeInfo.Pods() {
if p.DeletionTimestamp != nil && util.GetPodPriority(p) < util.GetPodPriority(pod) {
return false
}
}
}
}
return true
}
3.3.4. nodesWherePreemptionMightHelp
nodesWherePreemptionMightHelp
要做的事情是寻找 predicates 阶段失败但是通过抢占也许能够调度成功的 nodes.
这个函数也不怎么长,看下代码:
pkg/scheduler/core/generic_scheduler.go:1060
func nodesWherePreemptionMightHelp(nodes []*v1.Node, failedPredicatesMap FailedPredicateMap) []*v1.Node {
potentialNodes := []*v1.Node{}
for _, node := range nodes {
unresolvableReasonExist := false
failedPredicates, _ := failedPredicatesMap[node.Name]
for _, failedPredicate := range failedPredicates {
switch failedPredicate {
case
predicates.ErrNodeSelectorNotMatch,
predicates.ErrPodAffinityRulesNotMatch,
predicates.ErrPodNotMatchHostName,
predicates.ErrTaintsTolerationsNotMatch,
predicates.ErrNodeLabelPresenceViolated,
predicates.ErrNodeNotReady,
predicates.ErrNodeNetworkUnavailable,
predicates.ErrNodeUnderDiskPressure,
predicates.ErrNodeUnderPIDPressure,
predicates.ErrNodeUnderMemoryPressure,
predicates.ErrNodeOutOfDisk,
predicates.ErrNodeUnschedulable,
predicates.ErrNodeUnknownCondition,
predicates.ErrVolumeZoneConflict,
predicates.ErrVolumeNodeConflict,
predicates.ErrVolumeBindConflict:
unresolvableReasonExist = true
break
}
}
if !unresolvableReasonExist {
klog.V(3).Infof("Node %v is a potential node for preemption.", node.Name)
potentialNodes = append(potentialNodes, node)
}
}
return potentialNodes
}
3.3.5. selectNodesForPreemption
这个函数会并发计算所有的 nodes 是否通过驱逐实现 pod 抢占。
看这个函数内容之前我们先看一下返回值的类型:
map[*v1.Node]*schedulerapi.Victims
的 key 很好理解,value 是啥呢:
type Victims struct {
Pods []*v1.Pod
NumPDBViolations int
}
这里的 Pods 是被选中准备要驱逐的;NumPDBViolations 表示的是要破坏多少个 PDB 限制。这里肯定也就是要尽量符合 PDB 要求,能不和 PDB 冲突就不冲突。
然后看一下这个函数的整体过程:
pkg/scheduler/core/generic_scheduler.go:895
func selectNodesForPreemption(pod *v1.Pod,
nodeNameToInfo map[string]*schedulercache.NodeInfo,
potentialNodes []*v1.Node,
predicates map[string]algorithm.FitPredicate,
metadataProducer algorithm.PredicateMetadataProducer,
queue internalqueue.SchedulingQueue,
pdbs []*policy.PodDisruptionBudget,
) (map[*v1.Node]*schedulerapi.Victims, error) {
nodeToVictims := map[*v1.Node]*schedulerapi.Victims{}
var resultLock sync.Mutex
meta := metadataProducer(pod, nodeNameToInfo)
checkNode := func(i int) {
nodeName := potentialNodes[i].Name
var metaCopy algorithm.PredicateMetadata
if meta != nil {
metaCopy = meta.ShallowCopy()
}
pods, numPDBViolations, fits := selectVictimsOnNode(pod, metaCopy, nodeNameToInfo[nodeName], predicates, queue, pdbs)
if fits {
resultLock.Lock()
victims := schedulerapi.Victims{
Pods: pods,
NumPDBViolations: numPDBViolations,
}
nodeToVictims[potentialNodes[i]] = &victims
resultLock.Unlock()
}
}
workqueue.ParallelizeUntil(context.TODO(), 16, len(potentialNodes), checkNode)
return nodeToVictims, nil
}
上面这个函数的核心逻辑在 selectVictimsOnNode 中,这个函数尝试在给定的 node 中寻找最少数量的需要被驱逐的 pods,同时需要保证驱逐了这些 pods 之后,这个 noode 能够满足“pod”运行需求。
这些被驱逐的 pods 计算同时需要满足一个约束,就是能够删除低优先级的 pod 绝不先删高优先级的 pod.
这个算法首选计算当这个 node 上所有的低优先级 pods 被驱逐之后能否调度“pod”. 如果可以,那就按照优先级排序,根据 PDB 是否破坏分成两组,一组是影响 PDB 限制的,另外一组是不影响 PDB. 两组各自按照优先级排序。然后开始逐渐释放影响 PDB 的 group 中的 pod,然后逐渐释放不影响 PDB 的 group 中的 pod,在这个过程中要保持“pod”能够 fit 这个 node. 也就是说一旦放过某一个 pod 导致“pod”不 fit 这个 node 了,那就说明这个 pod 不能放过,也就是意味着已经找到了最少 pods 集。
看一下具体的实现吧:
FILENAME pkg/scheduler/core/generic_scheduler.go:983
func selectVictimsOnNode(
pod *v1.Pod,
meta algorithm.PredicateMetadata,
nodeInfo *schedulercache.NodeInfo,
fitPredicates map[string]algorithm.FitPredicate,
queue internalqueue.SchedulingQueue,
pdbs []*policy.PodDisruptionBudget,
) ([]*v1.Pod, int, bool) {
if nodeInfo == nil {
return nil, 0, false
}
potentialVictims := util.SortableList{CompFunc: util.HigherPriorityPod}
nodeInfoCopy := nodeInfo.Clone()
removePod := func(rp *v1.Pod) {
nodeInfoCopy.RemovePod(rp)
if meta != nil {
meta.RemovePod(rp)
}
}
addPod := func(ap *v1.Pod) {
nodeInfoCopy.AddPod(ap)
if meta != nil {
meta.AddPod(ap, nodeInfoCopy)
}
}
podPriority := util.GetPodPriority(pod)
for _, p := range nodeInfoCopy.Pods() {
if util.GetPodPriority(p) < podPriority {
potentialVictims.Items = append(potentialVictims.Items, p)
removePod(p)
}
}
potentialVictims.Sort()
if fits, _, err := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil, queue, false, nil); !fits {
if err != nil {
klog.Warningf("Encountered error while selecting victims on node %v: %v", nodeInfo.Node().Name, err)
}
return nil, 0, false
}
var victims []*v1.Pod
numViolatingVictim := 0
violatingVictims, nonViolatingVictims := filterPodsWithPDBViolation(potentialVictims.Items, pdbs)
reprievePod := func(p *v1.Pod) bool {
addPod(p)
fits, _, _ := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil, queue, false, nil)
if !fits {
removePod(p)
victims = append(victims, p)
klog.V(5).Infof("Pod %v is a potential preemption victim on node %v.", p.Name, nodeInfo.Node().Name)
}
return fits
}
for _, p := range violatingVictims {
if !reprievePod(p) {
numViolatingVictim++
}
}
for _, p := range nonViolatingVictims {
reprievePod(p)
}
return victims, numViolatingVictim, true
}
3.3.6. pickOneNodeForPreemption
pickOneNodeForPreemption
要从给定的 nodes 中选择一个 node,这个函数假设给定的 map 中 value 部分是以 priority 降序排列的。这里选择 node 的标准是:
- 最少的 PDB violations
- 最少的高优先级 victim
- 优先级总数字最小
- victim 总数最小
- 直接返回第一个
pkg/scheduler/core/generic_scheduler.go:788
func pickOneNodeForPreemption(nodesToVictims map[*v1.Node]*schedulerapi.Victims) *v1.Node {
if len(nodesToVictims) == 0 {
return nil
}
minNumPDBViolatingPods := math.MaxInt32
var minNodes1 []*v1.Node
lenNodes1 := 0
for node, victims := range nodesToVictims {
if len(victims.Pods) == 0 {
return node
}
numPDBViolatingPods := victims.NumPDBViolations
if numPDBViolatingPods < minNumPDBViolatingPods {
minNumPDBViolatingPods = numPDBViolatingPods
minNodes1 = nil
lenNodes1 = 0
}
if numPDBViolatingPods == minNumPDBViolatingPods {
minNodes1 = append(minNodes1, node)
lenNodes1++
}
}
if lenNodes1 == 1 {
return minNodes1[0]
}
minHighestPriority := int32(math.MaxInt32)
var minNodes2 = make([]*v1.Node, lenNodes1)
lenNodes2 := 0
for i := 0; i < lenNodes1; i++ {
node := minNodes1[i]
victims := nodesToVictims[node]
highestPodPriority := util.GetPodPriority(victims.Pods[0])
if highestPodPriority < minHighestPriority {
minHighestPriority = highestPodPriority
lenNodes2 = 0
}
if highestPodPriority == minHighestPriority {
minNodes2[lenNodes2] = node
lenNodes2++
}
}
if lenNodes2 == 1 {
return minNodes2[0]
}
minSumPriorities := int64(math.MaxInt64)
lenNodes1 = 0
for i := 0; i < lenNodes2; i++ {
var sumPriorities int64
node := minNodes2[i]
for _, pod := range nodesToVictims[node].Pods {
sumPriorities += int64(util.GetPodPriority(pod)) + int64(math.MaxInt32+1)
}
if sumPriorities < minSumPriorities {
minSumPriorities = sumPriorities
lenNodes1 = 0
}
if sumPriorities == minSumPriorities {
minNodes1[lenNodes1] = node
lenNodes1++
}
}
if lenNodes1 == 1 {
return minNodes1[0]
}
minNumPods := math.MaxInt32
lenNodes2 = 0
for i := 0; i < lenNodes1; i++ {
node := minNodes1[i]
numPods := len(nodesToVictims[node].Pods)
if numPods < minNumPods {
minNumPods = numPods
lenNodes2 = 0
}
if numPods == minNumPods {
minNodes2[lenNodes2] = node
lenNodes2++
}
}
if lenNodes2 > 0 {
return minNodes2[0]
}
klog.Errorf("Error in logic of node scoring for preemption. We should never reach here!")
return nil
}
4. 小结
咋个说呢,此处应该有总结的,抢占过程的逻辑比我想象中的复杂,设计很巧妙,行云流水,大快人心!preemption 可以简单说成再预选->再优选吧;还是不多说了,一天写这么多有点坐不住了,下回再继续聊调度器~
