经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 程序设计 » Go语言 » 查看文章
Golang开源定时任务调度框架robfig/cron优化
来源:cnblogs  作者:丶Blank  时间:2021/6/15 9:17:17  对本文有异议

项目中需要使用一个简单的定时任务调度的框架,最初直接从GitHub上搜了一个star比较多的,就是 https://github.com/robfig/cron 这个,目前有8000+ star。刚开始使用的时候发现问题不大,但是随着单机需要定时调度的任务越来越多,高峰期差不多接近500QPS,随着业务的推广使用,可以预期增长还会比较快,但是已经遇到CPU使用率偏高的问题,通过pprof分析,很多都是在做排序,看了下这个项目的代码,整体执行的过程大概如下:

1. 对所有任务进行排序,按照下次执行时间进行排序

2. 选择数组中第一个任务,计算下次执行时间减去当前时间得到时间t,然后sleep t

3. 然后从数组第一个元素开始遍历任务,如果此任务需要调度的时间<now,那么就执行此任务,执行之后重新计算这个任务的next执行时间

4. 每次待执行的任务执行完毕之后,都会重新对这个数组进行排序

5. 然后再循环从排好序的数组中找到第一个需要执行的任务去执行。

 

 

代码如下:

  1. for {
  2. // Determine the next entry to run.
  3. sort.Sort(byTime(c.entries))
  4. var timer *time.Timer
  5. if len(c.entries) == 0 || c.entries[0].Next.IsZero() {
  6. // If there are no entries yet, just sleep - it still handles new entries
  7. // and stop requests.
  8. timer = time.NewTimer(100000 * time.Hour)
  9. } else {
  10. timer = time.NewTimer(c.entries[0].Next.Sub(now))
  11. }
  12. for {
  13. select {
  14. case now = <-timer.C:
  15. now = now.In(c.location)
  16. c.logger.Info("wake", "now", now)
  17. // Run every entry whose next time was less than now
  18. for _, e := range c.entries {
  19. if e.Next.After(now) || e.Next.IsZero() {
  20. break
  21. }
  22. c.startJob(e.WrappedJob)
  23. e.Prev = e.Next
  24. e.Next = e.Schedule.Next(now)
  25. c.logger.Info("run", "now", now, "entry", e.ID, "next", e.Next)
  26. }
  27. case newEntry := <-c.add:
  28. timer.Stop()
  29. now = c.now()
  30. newEntry.Next = newEntry.Schedule.Next(now)
  31. c.entries = append(c.entries, newEntry)
  32. c.logger.Info("added", "now", now, "entry", newEntry.ID, "next", newEntry.Next)
  33. case replyChan := <-c.snapshot:
  34. replyChan <- c.entrySnapshot()
  35. continue
  36.  
  37. case <-c.stop:
  38. timer.Stop()
  39. c.logger.Info("stop")
  40. return
  41.  
  42. case id := <-c.remove:
  43. timer.Stop()
  44. now = c.now()
  45. c.removeEntry(id)
  46. c.logger.Info("removed", "entry", id)
  47. }
  48. break
  49. }
  50. }

问题就显而易见了,执行一个任务(或几个任务)都重新计算next执行时间,重新排序,最坏情况就是每次执行1个任务,排序一遍,那么执行k个任务需要的时间的时间复杂度就是O(k*nlogn),这无疑是非常低效的。

于是想着怎么优化一下这个框架,不难想到每次找最先需要执行的任务就是从一堆任务中找schedule_time最小的那一个(设schedule_time是任务的执行时间),那么比较容易想到的思路就是使用最小堆:

1. 在初始化任务列表的时候就直接构建一个最小堆

2. 每次执行查看peek元素是否需要执行

3. 需要执行就pop堆顶元素,计算next执行时间,重新push入堆

4. 不需要执行就break到外层循环取堆顶元素,计算next_time-now() = need_sleep_time,然后select 睡眠、add、remove等操作。

 

我修改为min-heap的方式之后,每次添加任务的时候通过堆的属性进行up和down调整,每次添加任务时间复杂度O(logn),执行k个任务时间复杂度是O(klogn)。经过验证线上CPU使用降低4~5倍。CPU从50%左右降低至10%左右

 

优化后的代码如下,只是其中一部分,关键部分已经高亮。

全部的代码也已经在github上已经创建了一个Fork的仓库并推送上去了,全部单测Case也都PASS。感兴趣可以点过去看。https://github.com/tovenja/cron

  1. for {
  2. // Determine the next entry to run.
  3. // Use min-heap no need sort anymore


         // 这里不再需要排序了,因为add的时候直接进行堆调整
         //
    sort.Sort(byTime(c.entries))
  4.  
  5. var timer *time.Timer
  6. if len(c.entries) == 0 || c.entries[0].Next.IsZero() {
  7. // If there are no entries yet, just sleep - it still handles new entries
  8. // and stop requests.
  9. timer = time.NewTimer(100000 * time.Hour)
  10. } else {
  11. timer = time.NewTimer(c.entries[0].Next.Sub(now))
  12. //fmt.Printf(" %v, %+v\n", c.entries[0].Next.Sub(now), c.entries[0].ID)
  13. }
  14. for {
  15. select {
  16. case now = <-timer.C:
  17. now = now.In(c.location)
  18. c.logger.Info("wake", "now", now)
  19. // Run every entry whose next time was less than now
  20. for {
  21. e := c.entries.Peek()
  22. if e.Next.After(now) || e.Next.IsZero() {
  23. break
  24. }
  25. e = heap.Pop(&c.entries).(*Entry)
  26. c.startJob(e.WrappedJob)
  27. e.Prev = e.Next
  28. e.Next = e.Schedule.Next(now)
  29. heap.Push(&c.entries, e)
  30. c.logger.Info("run", "now", now, "entry", e.ID, "next", e.Next)
  31. }
  32. case newEntry := <-c.add:
  33. timer.Stop()
  34. now = c.now()
  35. newEntry.Next = newEntry.Schedule.Next(now)
  36. heap.Push(&c.entries, newEntry)
  37. c.logger.Info("added", "now", now, "entry", newEntry.ID, "next", newEntry.Next)
  38. case replyChan := <-c.snapshot:
  39. replyChan <- c.entrySnapshot()
  40. continue
  41.  
  42. case <-c.stop:
  43. timer.Stop()
  44. c.logger.Info("stop")
  45. return
  46.  
  47. case id := <-c.remove:
  48. timer.Stop()
  49. now = c.now()
  50. c.removeEntry(id)
  51. c.logger.Info("removed", "entry", id)
  52. }
  53. break
  54. }
  55. }

 

 

原文链接:http://www.cnblogs.com/aboutblank/p/14860571.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号