经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 数据库/运维 » Kubernetes » 查看文章
Kubernetes GoRoutineMap工具包代码详解
来源:cnblogs  作者:人艰不拆_zmc  时间:2023/5/29 21:41:18  对本文有异议

1、概述

GoRoutineMap 定义了一种类型,可以运行具有名称的 goroutine 并跟踪它们的状态。它防止创建具有相同名称的多个goroutine,并且在上一个具有该名称的 goroutine 完成后的一段退避时间内可能阻止重新创建 goroutine。

使用GoRoutineMap场景:

  • 使用协程的方式运行函数逻辑,如果函数成功执行,则退出该协程;如果函数执行报错,在指数退避的时间内禁止再次执行该函数逻辑。

使用GoRoutineMap大体步骤如下:

1)通过goRoutineMap.NewGoRoutineMap(exponentialBackOffOnError bool) GoRoutineMap {....}方法创建GoRoutineMap结构体对象,用于管理goroutine 并跟踪它们的状态;

2)调用GoRoutineMap结构体对象Run(operationName, operation)方法,其能够防止创建具有相同名称的多个goroutine,并使用协程的方式运行函数逻辑,如果函数成功执行,则退出该协程;如果函数执行报错,在指数退避的时间内禁止再次执行该函数逻辑。

注意 1:本文代码基于Kubernetes 1.24.10版本,包路径kubernetes-1.24.10/pkg/util/goroutinemap/goroutinemap.go。

注意 2:概述中涉及的代码会在下文进行详细解释。

2、goroutinemap工具包代码详解

2.1 相关类型详解

GoRoutineMap工具包接口定义:

  1. type GoRoutineMap interface {
  2. // Run adds operation name to the list of running operations and spawns a
  3. // new go routine to execute the operation.
  4. // If an operation with the same operation name already exists, an
  5. // AlreadyExists or ExponentialBackoff error is returned.
  6. // Once the operation is complete, the go routine is terminated and the
  7. // operation name is removed from the list of executing operations allowing
  8. // a new operation to be started with the same operation name without error.
  9. Run(operationName string, operationFunc func() error) error
  10.  
  11. // Wait blocks until operations map is empty. This is typically
  12. // necessary during tests - the test should wait until all operations finish
  13. // and evaluate results after that.
  14. Wait()
  15.  
  16. // WaitForCompletion blocks until either all operations have successfully completed
  17. // or have failed but are not pending. The test should wait until operations are either
  18. // complete or have failed.
  19. WaitForCompletion()
  20.  
  21. IsOperationPending(operationName string) bool
  22. }

goRoutineMap结构体实现GoRoutineMap接口,定义如下:

  1. // goRoutineMap结构体实现GoRoutineMap接口,
  2. type goRoutineMap struct {
  3. // 用于记录goRoutineMap维护协程的状态
  4. operations map[string]operation
  5. // 发生错误时是否指数级补偿
  6. exponentialBackOffOnError bool
  7. // 用在多个 Goroutine 等待,一个 Goroutine 通知(事件发生)的场景
  8. cond *sync.Cond
  9. lock sync.RWMutex
  10. }
  11.  
  12. // operation结构体对象维护单个goroutine的状态。
  13. type operation struct {
  14. // 是否操作挂起
  15. operationPending bool
  16. // 单个goroutine执行逻辑报错时,实现以指数退避方式
  17. expBackoff exponentialbackoff.ExponentialBackoff
  18. }

ExponentialBackoff结构体包含最后一次出现的错误、最后一次出现错误的时间以及不允许重试的持续时间。

  1. // ExponentialBackoff contains the last occurrence of an error and the duration
  2. // that retries are not permitted.
  3. type ExponentialBackoff struct {
  4. lastError error
  5. lastErrorTime time.Time
  6. durationBeforeRetry time.Duration
  7. }

2.2 GoRoutineMap结构体对象初始化

通过goRoutineMap.NewGoRoutineMap方法创建GoRoutineMap结构体对象,用于管理goroutine 并跟踪它们的状态;  

  1. // NewGoRoutineMap returns a new instance of GoRoutineMap.
  2. func NewGoRoutineMap(exponentialBackOffOnError bool) GoRoutineMap {
  3. g := &goRoutineMap{
  4. operations: make(map[string]operation),
  5. exponentialBackOffOnError: exponentialBackOffOnError,
  6. }
  7.  
  8. g.cond = sync.NewCond(&g.lock)
  9. return g
  10. }

2.3  GoRoutineMap.Run方法代码详解

调用GoRoutineMap结构体对象Run(operationName, operation)方法,其能够防止创建具有相同名称的多个goroutine,并使用协程的方式运行函数逻辑,如果函数成功执行,则退出该协程;如果函数执行报错,在指数退避的时间内禁止再次执行该函数逻辑。

  1. // Run函数是外部函数,是goRoutineMap核心方法,其能够防止创建具有相同名称的多个goroutine,并使用协程的方式运行函数逻辑
  2. // 如果函数成功执行,则退出该协程;如果函数执行报错,在指数退避的时间内禁止再次执行该函数逻辑。
  3. func (grm *goRoutineMap) Run(
  4. operationName string,
  5. operationFunc func() error) error {
  6. grm.lock.Lock()
  7. defer grm.lock.Unlock()
  8.  
  9. // 判断grm.operations这个map中是否存在具有operationName名称的协程
  10. existingOp, exists := grm.operations[operationName]
  11. if exists {
  12. // 如果grm.operations这个map中已经存在operationName名称的协程,并且existingOp.operationPending==true,说明grm.operations中operationName名称这个协程正在执行函数逻辑,在这期间又有一个同名的
  13. // operationName希望加入grm.operations这个map,此时加入map失败并报AlreadyExistsError错误
  14. if existingOp.operationPending {
  15. return NewAlreadyExistsError(operationName)
  16. }
  17.  
  18. // 到这步说明名称为operationName名称的协程执行函数逻辑失败,此时判断此协程最后一次失败时间 + 指数退避的时间 >= 当前时间,如果不符合条件的话禁止执行该协程函数逻辑。
  19. if err := existingOp.expBackoff.SafeToRetry(operationName); err != nil {
  20. return err
  21. }
  22. }
  23.  
  24. // 如果grm.operations这个map中不存在operationName名称的协程 或者 此协程最后一次失败时间 + 指数退避的时间 < 当前时间,则在grm.operations这个map中重新维护此协程(注意,operationPending=true)
  25. grm.operations[operationName] = operation{
  26. operationPending: true,
  27. expBackoff: existingOp.expBackoff,
  28. }
  29.  
  30. // 以协程方式执行函数逻辑operationFunc()
  31. go func() (err error) {
  32. // 捕获崩溃并记录错误,默认不传参的话,在程序发送崩溃时,在控制台打印一下崩溃日志后再崩溃,方便技术人员排查程序错误。
  33. defer k8sRuntime.HandleCrash()
  34.  
  35. // 如果执行operationFunc()函数逻辑不报错或者grm.exponentialBackOffOnError=false的话,将从grm.operations这个map中移除此operationName名称协程;
  36. // 如果执行operationFunc()函数逻辑报错并且grm.exponentialBackOffOnError=true,则将产生指数级补偿,到达补偿时间后才能再调用此operationName名称协程的函数逻辑
  37. // Handle completion of and error, if any, from operationFunc()
  38. defer grm.operationComplete(operationName, &err)
  39. // 处理operationFunc()函数发生的panic错误,以便defer grm.operationComplete(operationName, &err)能执行
  40. // Handle panic, if any, from operationFunc()
  41. defer k8sRuntime.RecoverFromPanic(&err)
  42. return operationFunc()
  43. }()
  44.  
  45. return nil
  46. }

如果给定lastErrorTime的durationBeforeRetry周期尚未过期,则SafeToRetry返回错误。否则返回零。

  1. // SafeToRetry returns an error if the durationBeforeRetry period for the given
  2. // lastErrorTime has not yet expired. Otherwise it returns nil.
  3. func (expBackoff *ExponentialBackoff) SafeToRetry(operationName string) error {
  4. if time.Since(expBackoff.lastErrorTime) <= expBackoff.durationBeforeRetry {
  5. return NewExponentialBackoffError(operationName, *expBackoff)
  6. }
  7.  
  8. return nil
  9. }

operationComplete是一个内部函数,用于处理在goRoutineMap中已经运行完函数逻辑的协程。

  1. // operationComplete是一个内部函数,用于处理在goRoutineMap中已经运行完函数逻辑的协程
  2. // 如果执行operationFunc()函数逻辑不报错或者grm.exponentialBackOffOnError=false的话,将从grm.operations这个map中移除此operationName名称协程;
  3. // 如果执行operationFunc()函数逻辑报错并且grm.exponentialBackOffOnError=true,则将产生指数级补偿,到达补偿时间后才能再调用此operationName名称协程的函数逻辑
  4. // operationComplete handles the completion of a goroutine run in the
  5. // goRoutineMap.
  6. func (grm *goRoutineMap) operationComplete(
  7. operationName string, err *error) {
  8. // Defer operations are executed in Last-In is First-Out order. In this case
  9. // the lock is acquired first when operationCompletes begins, and is
  10. // released when the method finishes, after the lock is released cond is
  11. // signaled to wake waiting goroutine.
  12. defer grm.cond.Signal()
  13. grm.lock.Lock()
  14. defer grm.lock.Unlock()
  15.  
  16. if *err == nil || !grm.exponentialBackOffOnError {
  17. // 函数逻辑执行完成无错误或已禁用错误指数级补偿,将从grm.operations这个map中移除此operationName名称协程;
  18. // Operation completed without error, or exponentialBackOffOnError disabled
  19. delete(grm.operations, operationName)
  20. if *err != nil {
  21. // Log error
  22. klog.Errorf("operation for %q failed with: %v",
  23. operationName,
  24. *err)
  25. }
  26. } else {
  27. // 函数逻辑执行完成有错误则将产生指数级补偿,到达补偿时间后才能再调用此operationName名称协程的函数逻辑(注意,指数补充的协程,operationPending=false)
  28. // Operation completed with error and exponentialBackOffOnError Enabled
  29. existingOp := grm.operations[operationName]
  30. existingOp.expBackoff.Update(err)
  31. existingOp.operationPending = false
  32. grm.operations[operationName] = existingOp
  33.  
  34. // Log error
  35. klog.Errorf("%v",
  36. existingOp.expBackoff.GenerateNoRetriesPermittedMsg(operationName))
  37. }
  38. }

Update是一个外部函数,用于计算指数级别的退避时间。

  1. func (expBackoff *ExponentialBackoff) Update(err *error) {
  2. if expBackoff.durationBeforeRetry == 0 {
  3. expBackoff.durationBeforeRetry = initialDurationBeforeRetry
  4. } else {
  5. expBackoff.durationBeforeRetry = 2 * expBackoff.durationBeforeRetry
  6. if expBackoff.durationBeforeRetry > maxDurationBeforeRetry {
  7. expBackoff.durationBeforeRetry = maxDurationBeforeRetry
  8. }
  9. }
  10.  
  11. expBackoff.lastError = *err
  12. expBackoff.lastErrorTime = time.Now()
  13. }

3、总结

本文对Kubernetes GoRoutineMap工具包代码进行了详解,通过 GoRoutineMap工具包能够防止创建具有相同名称的多个goroutine,并使用协程的方式运行函数逻辑,如果函数成功执行,则退出该协程;如果函数执行报错,在指数退避的时间内禁止再次执行该函数逻辑。使用Kubernetes GoRoutineMap包的好处包括以下几点:

  1. 减轻负载:当出现错误时,使用指数退避时间可以避免过于频繁地重新尝试操作,从而减轻系统的负载。指数退避时间通过逐渐增加重试之间的等待时间,有效地减少了对系统资源的过度使用。

  2. 提高稳定性:通过逐渐增加重试之间的等待时间,指数退避时间可以帮助应对瞬时的故障或错误。这种策略使得系统能够在短时间内自动恢复,并逐渐增加重试频率,直到操作成功为止。这有助于提高应用程序的稳定性和可靠性。

  3. 降低网络拥塞:当网络出现拥塞时,频繁地进行重试可能会加重拥塞问题并影响其他任务的正常运行。指数退避时间通过增加重试之间的等待时间,可以降低对网络的额外负载,有助于缓解网络拥塞问题。

  4. 避免过早放弃:某些错误可能是瞬时的或暂时性的,因此过早放弃重试可能会导致不必要的失败。指数退避时间确保了在错误发生时进行适当的重试,以便系统有更多机会恢复并成功完成操作。

综上所述,使用Kubernetes GoRoutineMap工具包以协程方式处理函数逻辑可以提高系统的可靠性、稳定性和性能,减轻负载并有效应对错误和故障情况。这是在Kubernetes中实施的一种常见的重试策略,常用于处理容器化应用程序中的操作错误。

原文链接:https://www.cnblogs.com/zhangmingcheng/p/17439413.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号