经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 程序设计 » Go语言 » 查看文章
go-zero 是如何实现令牌桶限流的?
来源:cnblogs  作者:yongxinz  时间:2023/8/11 8:35:21  对本文有异议

原文链接:

上一篇文章介绍了 如何实现计数器限流?主要有两种实现方式,分别是固定窗口和滑动窗口,并且分析了 go-zero 采用固定窗口方式实现的源码。

但是采用固定窗口实现的限流器会有两个问题:

  1. 会出现请求量超出限制值两倍的情况
  2. 无法很好处理流量突增问题

这篇文章来介绍一下令牌桶算法,可以很好解决以上两个问题。

工作原理

算法概念如下:

  • 令牌以固定速率生成;
  • 生成的令牌放入令牌桶中存放,如果令牌桶满了则多余的令牌会直接丢弃,当请求到达时,会尝试从令牌桶中取令牌,取到了令牌的请求可以执行;
  • 如果桶空了,那么尝试取令牌的请求会被直接丢弃。

令牌桶算法既能够将所有的请求平均分布到时间区间内,又能接受服务器能够承受范围内的突发请求,因此是目前使用较为广泛的一种限流算法。

源码实现

源码分析我们还是以 go-zero 项目为例,首先来看生成令牌的部分,依然是使用 Redis 来实现。

  1. // core/limit/tokenlimit.go
  2. // 生成 token 速率
  3. script = `local rate = tonumber(ARGV[1])
  4. // 通容量
  5. local capacity = tonumber(ARGV[2])
  6. // 当前时间戳
  7. local now = tonumber(ARGV[3])
  8. // 请求数量
  9. local requested = tonumber(ARGV[4])
  10. // 需要多少秒才能把桶填满
  11. local fill_time = capacity/rate
  12. // 向下取整,ttl 为填满时间 2 倍
  13. local ttl = math.floor(fill_time*2)
  14. // 当前桶剩余容量,如果为 nil,说明第一次使用,赋值为桶最大容量
  15. local last_tokens = tonumber(redis.call("get", KEYS[1]))
  16. if last_tokens == nil then
  17. last_tokens = capacity
  18. end
  19. // 上次请求时间戳,如果为 nil 则赋值 0
  20. local last_refreshed = tonumber(redis.call("get", KEYS[2]))
  21. if last_refreshed == nil then
  22. last_refreshed = 0
  23. end
  24. // 距离上一次请求的时间跨度
  25. local delta = math.max(0, now-last_refreshed)
  26. // 距离上一次请求的时间跨度能生成的 token 数量和桶内剩余 token 数量的和
  27. // 与桶容量比较,取二者的小值
  28. local filled_tokens = math.min(capacity, last_tokens+(delta*rate))
  29. // 判断请求数量和桶内 token 数量的大小
  30. local allowed = filled_tokens >= requested
  31. // 被请求消耗掉之后,更新剩余 token 数量
  32. local new_tokens = filled_tokens
  33. if allowed then
  34. new_tokens = filled_tokens - requested
  35. end
  36. // 更新 redis token
  37. redis.call("setex", KEYS[1], ttl, new_tokens)
  38. // 更新 redis 刷新时间
  39. redis.call("setex", KEYS[2], ttl, now)
  40. return allowed`

Redis 中主要保存两个 key,分别是 token 数量和刷新时间。

核心思想就是比较两次请求时间间隔内生成的 token 数量 + 桶内剩余 token 数量,和请求量之间的大小,如果满足则允许,否则则不允许。

限流器初始化:

  1. // A TokenLimiter controls how frequently events are allowed to happen with in one second.
  2. type TokenLimiter struct {
  3. // 生成 token 速率
  4. rate int
  5. // 桶容量
  6. burst int
  7. store *redis.Redis
  8. // 桶 key
  9. tokenKey string
  10. // 桶刷新时间 key
  11. timestampKey string
  12. rescueLock sync.Mutex
  13. // redis 健康标识
  14. redisAlive uint32
  15. // redis 健康监控启动状态
  16. monitorStarted bool
  17. // 内置单机限流器
  18. rescueLimiter *xrate.Limiter
  19. }
  20. // NewTokenLimiter returns a new TokenLimiter that allows events up to rate and permits
  21. // bursts of at most burst tokens.
  22. func NewTokenLimiter(rate, burst int, store *redis.Redis, key string) *TokenLimiter {
  23. tokenKey := fmt.Sprintf(tokenFormat, key)
  24. timestampKey := fmt.Sprintf(timestampFormat, key)
  25. return &TokenLimiter{
  26. rate: rate,
  27. burst: burst,
  28. store: store,
  29. tokenKey: tokenKey,
  30. timestampKey: timestampKey,
  31. redisAlive: 1,
  32. rescueLimiter: xrate.NewLimiter(xrate.Every(time.Second/time.Duration(rate)), burst),
  33. }
  34. }

其中有一个变量 rescueLimiter,这是一个进程内的限流器。如果 Redis 发生故障了,那么就使用这个,算是一个保障,尽量避免系统被突发流量拖垮。

提供了四个可调用方法:

  1. // Allow is shorthand for AllowN(time.Now(), 1).
  2. func (lim *TokenLimiter) Allow() bool {
  3. return lim.AllowN(time.Now(), 1)
  4. }
  5. // AllowCtx is shorthand for AllowNCtx(ctx,time.Now(), 1) with incoming context.
  6. func (lim *TokenLimiter) AllowCtx(ctx context.Context) bool {
  7. return lim.AllowNCtx(ctx, time.Now(), 1)
  8. }
  9. // AllowN reports whether n events may happen at time now.
  10. // Use this method if you intend to drop / skip events that exceed the rate.
  11. // Otherwise, use Reserve or Wait.
  12. func (lim *TokenLimiter) AllowN(now time.Time, n int) bool {
  13. return lim.reserveN(context.Background(), now, n)
  14. }
  15. // AllowNCtx reports whether n events may happen at time now with incoming context.
  16. // Use this method if you intend to drop / skip events that exceed the rate.
  17. // Otherwise, use Reserve or Wait.
  18. func (lim *TokenLimiter) AllowNCtx(ctx context.Context, now time.Time, n int) bool {
  19. return lim.reserveN(ctx, now, n)
  20. }

最终调用的都是 reverveN 方法:

  1. func (lim *TokenLimiter) reserveN(ctx context.Context, now time.Time, n int) bool {
  2. // 判断 Redis 健康状态,如果 Redis 故障,则使用进程内限流器
  3. if atomic.LoadUint32(&lim.redisAlive) == 0 {
  4. return lim.rescueLimiter.AllowN(now, n)
  5. }
  6. // 执行限流脚本
  7. resp, err := lim.store.EvalCtx(ctx,
  8. script,
  9. []string{
  10. lim.tokenKey,
  11. lim.timestampKey,
  12. },
  13. []string{
  14. strconv.Itoa(lim.rate),
  15. strconv.Itoa(lim.burst),
  16. strconv.FormatInt(now.Unix(), 10),
  17. strconv.Itoa(n),
  18. })
  19. // redis allowed == false
  20. // Lua boolean false -> r Nil bulk reply
  21. if err == redis.Nil {
  22. return false
  23. }
  24. if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
  25. logx.Errorf("fail to use rate limiter: %s", err)
  26. return false
  27. }
  28. if err != nil {
  29. logx.Errorf("fail to use rate limiter: %s, use in-process limiter for rescue", err)
  30. // 如果有异常的话,会启动进程内限流
  31. lim.startMonitor()
  32. return lim.rescueLimiter.AllowN(now, n)
  33. }
  34. code, ok := resp.(int64)
  35. if !ok {
  36. logx.Errorf("fail to eval redis script: %v, use in-process limiter for rescue", resp)
  37. lim.startMonitor()
  38. return lim.rescueLimiter.AllowN(now, n)
  39. }
  40. // redis allowed == true
  41. // Lua boolean true -> r integer reply with value of 1
  42. return code == 1
  43. }

最后看一下进程内限流的启动与恢复:

  1. func (lim *TokenLimiter) startMonitor() {
  2. lim.rescueLock.Lock()
  3. defer lim.rescueLock.Unlock()
  4. // 需要加锁保护,如果程序已经启动了,直接返回,不要重复启动
  5. if lim.monitorStarted {
  6. return
  7. }
  8. lim.monitorStarted = true
  9. atomic.StoreUint32(&lim.redisAlive, 0)
  10. go lim.waitForRedis()
  11. }
  12. func (lim *TokenLimiter) waitForRedis() {
  13. ticker := time.NewTicker(pingInterval)
  14. // 更新监控进程的状态
  15. defer func() {
  16. ticker.Stop()
  17. lim.rescueLock.Lock()
  18. lim.monitorStarted = false
  19. lim.rescueLock.Unlock()
  20. }()
  21. for range ticker.C {
  22. // 对 redis 进行健康监测,如果 redis 服务恢复了
  23. // 则更新 redisAlive 标识,并退出 goroutine
  24. if lim.store.Ping() {
  25. atomic.StoreUint32(&lim.redisAlive, 1)
  26. return
  27. }
  28. }
  29. }

以上就是本文的全部内容,如果觉得还不错的话欢迎点赞转发关注,感谢支持。


参考文章:

推荐阅读:

原文链接:https://www.cnblogs.com/alwaysbeta/p/17621475.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号