经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 程序设计 » Python » 查看文章
python重试库retryiny源码剖析
来源:cnblogs  作者:killianxu  时间:2018/10/22 16:28:56  对本文有异议

  上篇博文介绍了常见需要进行请求重试的场景,本篇博文试着剖析有名的python第三方库retrying源码。

   在剖析其源码之前,有必要讲一下retrying的用法,方便理解。

   安装:

  pip install retrying

  或者

  easy_install retrying

  一些用法实例如下:

  1. #example 1
  2. from retrying import retry
  3. @retry
  4. def never_give_up_never_surrender():
  5. print "一直重试且两次重试之间无需等待"
  1. #example 2
  2. from retrying import retry
  3. @retry(stop_max_attempt_number=7)
  4. def stop_after_7_attempts():
  5. print "重试七次后停止"
  1. #example 3
  2. from retrying import retry
  3. @retry(stop_max_delay=10000)
  4. def stop_after_10_s():
  5. print "十秒之后停止重试"
  1. #example 4
  2. from retrying import retry
  3. @retry(wait_fixed=2000)
  4. def wait_2_s():
  5. print "每次重试间隔两秒"
  1. #example 5
  2. from retrying import retry
  3. @retry(wait_random_min=1000, wait_random_max=2000)
  4. def wait_random_1_to_2_s():
  5. print "每次重试随机等待1到2秒"
  1. #example 6
  2. from retrying import retry
  3. @retry(wait_exponential_multiplier=1000, wait_exponential_max=10000)
  4. def wait_exponential_1000():
  5. print "指数退避,每次重试等待 2^x * 1000 毫秒,上限是10秒,达到上限后每次都等待10秒"

 

  1. #example 7
  2. def retry_if_io_error(exception):
  3. """Return True if we should retry (in this case when it's an IOError), False otherwise"""
  4. return isinstance(exception, IOError)
  5. @retry(retry_on_exception=retry_if_io_error)
  6. def might_io_error():
  7. print "IO异常则重试,并且将其它异常抛出"
  8. @retry(retry_on_exception=retry_if_io_error, wrap_exception=True)
  9. def only_raise_retry_error_when_not_io_error():
  10. print "IO异常则重试,并且将其它异常用RetryError对象包裹"
  1. #exampe 8,根据返回结果判断是否重试
  2. def retry_if_result_none(result):
  3. """Return True if we should retry (in this case when result is None), False otherwise"""
  4. return result is None
  5. @retry(retry_on_result=retry_if_result_none)
  6. def might_return_none():
  7. print "若返回结果为None则重试"

  上面八个例子是retrying的用法,只需在要重试的方法上加上@retry注解,并以相应的条件为参数即可,那么@retry背后到底是如何实现的呢?下面给出@retry注解实现的方法。

  1. 1 #装饰器模式,对需要重试的函数,利用retry注解返回
  2. 2 def retry(*dargs, **dkw):
  3. 3 """
  4. 4 Decorator function that instantiates the Retrying object
  5. 5 @param *dargs: positional arguments passed to Retrying object
  6. 6 @param **dkw: keyword arguments passed to the Retrying object
  7. 7 """
  8. 8 # support both @retry and @retry() as valid syntax
  9. 9 #当用法为@retry不带括号时走这条路径,dargs[0]为retry注解的函数,返回函数对象wrapped_f
  10. 10 if len(dargs) == 1 and callable(dargs[0]):
  11. 11 def wrap_simple(f):
  12. 12
  13. 13 @six.wraps(f)#注解用于将函数f的签名复制到新函数wrapped_f
  14. 14 def wrapped_f(*args, **kw):
  15. 15 return Retrying().call(f, *args, **kw)
  16. 16
  17. 17 return wrapped_f
  18. 18
  19. 19 return wrap_simple(dargs[0])
  20. 20
  21. 21 else:#当用法为@retry()带括号时走这条路径,返回函数对象wrapped_f
  22. 22 def wrap(f):
  23. 23
  24. 24 @six.wraps(f)#注解用于将函数f的签名复制到新函数wrapped_f
  25. 25 def wrapped_f(*args, **kw):
  26. 26 return Retrying(*dargs, **dkw).call(f, *args, **kw)
  27. 27
  28. 28 return wrapped_f
  29. 29
  30. 30 return wrap

  当用@retry标记函数时,例如实例1,其实执行了

  1. never_give_up_never_surrender = retry(never_give_up_never_surrender)

  此时的never_give_up_never_surrender函数实际上是10-19行返回的wrapped_f函数,后续对never_give_up_never_surrender函数的调用都是调用的14行的wrapped_f函数。

当使用@retry()或者带参数的@retry(params)时,如实例2,实际执行了:

  1. stop_after_7_attempts = retry(stop_max_attempt_number)(stop_after_7_attempts)

  此时的stop_after_7_attempts函数实际上是22-29行的wrapped_f函数,后续对stop_after_7_attempts函数的调用都是对25行的wrapped_f函数调用。

可以看到实际上@retry将对需要重试的函数调用转化为对Retrying类中call函数的调用,重试逻辑也在这个函数实现,实现对逻辑代码的无侵入,代码如下:

 

  1. 1 def call(self, fn, *args, **kwargs):
  2. 2 start_time = int(round(time.time() * 1000))
  3. 3 attempt_number = 1
  4. 4 while True:
  5. 5 #_before_attempts为@retry传进来的before_attempts,在每次调用函数前执行一些操作
  6. 6 if self._before_attempts:
  7. 7 self._before_attempts(attempt_number)
  8. 8
  9. 9 try:#Attempt将函数执行结果或者异常信息以及执行次数作为内部状态,用TrueFalse标记是内部存的值正常执行结果还是异常
  10. 10 attempt = Attempt(fn(*args, **kwargs), attempt_number, False)
  11. 11 except:
  12. 12 tb = sys.exc_info()#获取异常堆栈信息,sys.exc_info()返回type(异常类型), value(异常说明), traceback(traceback对象,包含更丰富的信息)
  13. 13 attempt = Attempt(tb, attempt_number, True)
  14. 14
  15. 15 if not self.should_reject(attempt):#根据本次执行结果或异常类型判断是否应该停止
  16. 16 return attempt.get(self._wrap_exception)
  17. 17
  18. 18 if self._after_attempts:#_after_attempts@retry传进来的after_attempts,在每次调用函数后执行一些操作
  19. 19 self._after_attempts(attempt_number)
  20. 20
  21. 21 delay_since_first_attempt_ms = int(round(time.time() * 1000)) - start_time
  22. 22 if self.stop(attempt_number, delay_since_first_attempt_ms):#根据重试次数和延迟判断是否应该停止
  23. 23 if not self._wrap_exception and attempt.has_exception:
  24. 24 # get() on an attempt with an exception should cause it to be raised, but raise just in case
  25. 25 raise attempt.get()
  26. 26 else:
  27. 27 raise RetryError(attempt)
  28. 28 else:#不停止则等待一定时间,延迟时间根据wait函数返回值和_wait_jitter_max计算
  29. 29 sleep = self.wait(attempt_number, delay_since_first_attempt_ms)
  30. 30 if self._wait_jitter_max:
  31. 31 jitter = random.random() * self._wait_jitter_max
  32. 32 sleep = sleep + max(0, jitter)
  33. 33 time.sleep(sleep / 1000.0)
  34. 34
  35. 35 attempt_number += 1 #进行下一轮重试

  9-13行将函数执行返回结果或异常存入Attempt对象attempt中,Attempt类如下:

  1. class Attempt(object):
  2. """
  3. An Attempt encapsulates a call to a target function that may end as a
  4. normal return value from the function or an Exception depending on what
  5. occurred during the execution.
  6. """
  7. #value值为函数返回结果或异常,根据has_exception判断
  8. def __init__(self, value, attempt_number, has_exception):
  9. self.value = value
  10. self.attempt_number = attempt_number
  11. self.has_exception = has_exception
  12. #返回函数执行结果或异常,并根据wrap_exception参数对异常用RetryError包裹
  13. def get(self, wrap_exception=False):
  14. """
  15. Return the return value of this Attempt instance or raise an Exception.
  16. If wrap_exception is true, this Attempt is wrapped inside of a
  17. RetryError before being raised.
  18. """
  19. if self.has_exception:
  20. if wrap_exception:
  21. raise RetryError(self)
  22. else:#重新构造原异常抛出
  23. six.reraise(self.value[0], self.value[1], self.value[2])
  24. else:
  25. return self.value
  26. def __repr__(self):
  27. if self.has_exception:
  28. return "Attempts: {0}, Error:\n{1}".format(self.attempt_number, "".join(traceback.format_tb(self.value[2])))
  29. else:
  30. return "Attempts: {0}, Value: {1}".format(self.attempt_number, self.value)

  15行根据should_reject函数的返回值判断是否停止重试,代码如下:

  1. def should_reject(self, attempt):
  2. reject = False
  3. #假如异常在retry_on_exception参数中返回True,则重试,默认不传异常参数时,发生异常一直重试
  4. if attempt.has_exception:
  5. reject |= self._retry_on_exception(attempt.value[1])
  6. else:#假如函数返回结果在retry_on_result参数函数中为True,则重试
  7. reject |= self._retry_on_result(attempt.value)
  8. return reject

 

  22行根据重试次数和延迟判断是否应该停止重试,self.stop的赋值代码在构造函数中,代码片段如下:

  1. stop_funcs = []
  2. if stop_max_attempt_number is not None:
  3. stop_funcs.append(self.stop_after_attempt)
  4. if stop_max_delay is not None:
  5. stop_funcs.append(self.stop_after_delay)
  6. if stop_func is not None:
  7. self.stop = stop_func
  8. elif stop is None:#执行次数和延迟任何一个达到限制则停止
  9. self.stop = lambda attempts, delay: any(f(attempts, delay) for f in stop_funcs)
  10. else:
  11. self.stop = getattr(self, stop)


  1. def stop_after_attempt(self, previous_attempt_number, delay_since_first_attempt_ms):
  2. """Stop after the previous attempt >= stop_max_attempt_number."""
  3. return previous_attempt_number >= self._stop_max_attempt_number
  4. def stop_after_delay(self, previous_attempt_number, delay_since_first_attempt_ms):
  5. """Stop after the time from the first attempt >= stop_max_delay."""
  6. return delay_since_first_attempt_ms >= self._stop_max_delay

  29-33行等待一段时间再次重试,其中延迟时间重点是根据29行的wait函数计算,wait函数在构造函数中赋值,代码片段如下:

  1. wait_funcs = [lambda *args, **kwargs: 0]
  2. if wait_fixed is not None:
  3. wait_funcs.append(self.fixed_sleep)
  4. if wait_random_min is not None or wait_random_max is not None:
  5. wait_funcs.append(self.random_sleep)
  6. if wait_incrementing_start is not None or wait_incrementing_increment is not None:
  7. wait_funcs.append(self.incrementing_sleep)
  8. if wait_exponential_multiplier is not None or wait_exponential_max is not None:
  9. wait_funcs.append(self.exponential_sleep)
  10. if wait_func is not None:
  11. self.wait = wait_func
  12. elif wait is None:#返回几个函数的最大值,作为等待时间
  13. self.wait = lambda attempts, delay: max(f(attempts, delay) for f in wait_funcs)
  14. else:
  15. self.wait = getattr(self, wait)

  其中最值得研究的是指数退避延迟时间计算方法,函数为exponential_sleep,代码如下:

  1. def exponential_sleep(self, previous_attempt_number, delay_since_first_attempt_ms):
  2. exp = 2 ** previous_attempt_number
  3. result = self._wait_exponential_multiplier * exp #延迟时间为_wait_exponential_multiplier*2^x
  4. if result > self._wait_exponential_max:#假如大于退避上限_wait_exponential_max,则result为上限值
  5. result = self._wait_exponential_max
  6. if result < 0:
  7. result = 0
  8. return result

 

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号