经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 程序设计 » Python » 查看文章
Python Asyncio调度原理详情
来源:jb51  时间:2022/6/27 14:13:52  对本文有异议

前言

在文章《Python Asyncio中Coroutines,Tasks,Future可等待对象的关系及作用》中介绍了Python的可等待对象作用,特别是Task对象在启动的时候可以自我驱动,但是一个Task对象只能驱动一条执行链,如果要多条链执行(并发),还是需要EventLoop来安排驱动,接下来将通过Python.Asyncio库的源码来了解EventLoop是如何运作的。

1.基本介绍

Python.Asyncio是一个大而全的库,它包括很多功能,而跟核心调度相关的逻辑除了三种可等待对象外,还有其它一些功能,它们分别位于runners.pybase_event.pyevent.py三个文件中。

runners.py文件有一个主要的类--Runner,它的主要职责是做好进入协程模式的事件循环等到初始化工作,以及在退出协程模式时清理还在内存的协程,生成器等对象。

协程模式只是为了能方便理解,对于计算机而言,并没有这样区分

event.py文件除了存放着EventLoop对象的接口以及获取和设置EventLoop的函数外,还有两个EventLoop可调度的对象,分别为HandlerTimerHandler,它们可以认为是EvnetLoop调用其它对象的容器,用于连接待调度对象和事件循环的关系,不过它们的实现非常简单,对于Handler它的源码如下:

  1. # 已经移除了一些不想关的代码
  2. class Handle:
  3. def __init__(self, callback, args, loop, context=None):
  4. # 初始化上下文,确保执行的时候能找到Handle所在的上下文
  5. if context is None:
  6. context = contextvars.copy_context()
  7. self._context = context
  8. self._loop = loop
  9. self._callback = callback
  10. self._args = args
  11. self._cancelled = False
  12.  
  13. def cancel(self):
  14. # 设置当前Handle为取消状态
  15. if not self._cancelled:
  16. self._cancelled = True
  17. self._callback = None
  18. self._args = None
  19. def cancelled(self):
  20. return self._cancelled
  21. def _run(self):
  22. # 用于执行真正的函数,且通过context.run方法来确保在自己的上下文内执行。
  23. try:
  24. # 保持在自己持有的上下文中执行对应的回调
  25. self._context.run(self._callback, *self._args)
  26. except (SystemExit, KeyboardInterrupt):
  27. raise
  28. except BaseException as exc:
  29. cb = format_helpers._format_callback_source(
  30. self._callback, self._args)
  31. msg = f'Exception in callback {cb}'
  32. context = {
  33. 'message': msg,
  34. 'exception': exc,
  35. 'handle': self,
  36. }
  37. self._loop.call_exception_handler(context)

通过源码可以发现,Handle功能十分简单,提供了可以被取消以及可以在自己所处的上下文执行的功能,而TimerHandle继承于HandleHandle多了一些和时间以及排序相关的参数,源码如下:

  1. class TimerHandle(Handle):
  2. def __init__(self, when, callback, args, loop, context=None):
  3. super().__init__(callback, args, loop, context)
  4. self._when = when
  5. self._scheduled = False
  6. def __hash__(self):
  7. return hash(self._when)
  8. def __lt__(self, other):
  9. if isinstance(other, TimerHandle):
  10. return self._when < other._when
  11. return NotImplemented
  12. def __le__(self, other):
  13. if isinstance(other, TimerHandle):
  14. return self._when < other._when or self.__eq__(other)
  15. return NotImplemented
  16. def __gt__(self, other):
  17. if isinstance(other, TimerHandle):
  18. return self._when > other._when
  19. return NotImplemented
  20. def __ge__(self, other):
  21. if isinstance(other, TimerHandle):
  22. return self._when > other._when or self.__eq__(other)
  23. return NotImplemented
  24. def __eq__(self, other):
  25. if isinstance(other, TimerHandle):
  26. return (self._when == other._when and
  27. self._callback == other._callback and
  28. self._args == other._args and
  29. self._cancelled == other._cancelled)
  30. return NotImplemented
  31. def cancel(self):
  32. if not self._cancelled:
  33. # 用于通知事件循环当前Handle已经退出了
  34. self._loop._timer_handle_cancelled(self)
  35. super().cancel()
  36. def when(self):
  37. return self._when

通过代码可以发现,这两个对象十分简单,而我们在使用Python.Asyncio时并不会直接使用到这两个对象,而是通过loop.call_xxx系列方法来把调用封装成Handle对象,然后等待EventLoop执行。 所以loop.call_xxx系列方法可以认为是EventLoop的注册操作,基本上所有非IO的异步操作都需要通过loop.call_xxx方法来把自己的调用注册到EventLoop中,比如Task对象就在初始化后通过调用loop.call_soon方法来注册到EventLoop中,loop.call_sonn的实现很简单,

它的源码如下:

  1. class BaseEventLoop:
  2. ...
  3. def call_soon(self, callback, *args, context=None):
  4. # 检查是否事件循环是否关闭,如果是则直接抛出异常
  5. self._check_closed()
  6. handle = self._call_soon(callback, args, context)
  7. return handle
  8.  
  9. def _call_soon(self, callback, args, context):
  10. # 把调用封装成一个handle,这样方便被事件循环调用
  11. handle = events.Handle(callback, args, self, context)
  12. # 添加一个handle到_ready,等待被调用
  13. self._ready.append(handle)
  14. return handle

可以看到call_soon真正相关的代码只有10几行,它负责把一个调用封装成一个Handle,并添加到self._reday中,从而实现把调用注册到事件循环之中。

loop.call_xxx系列函数除了loop.call_soon系列函数外,还有另外两个方法--loop.call_atloop.call_later,它们类似于loop.call_soon,不过多了一个时间参数,来告诉EventLoop在什么时间后才可以调用,同时通过loop.call_atloop.call_later注册的调用会通过Python的堆排序模块headpq注册到self._scheduled变量中,

具体代码如下:

  1. class BaseEventLoop:
  2. ...
  3. def call_later(self, delay, callback, *args, context=None):
  4. if delay is None:
  5. raise TypeError('delay must not be None')
  6. timer = self.call_at(self.time() + delay, callback, *args, context=context)
  7. return timer
  8.  
  9. def call_at(self, when, callback, *args, context=None):
  10. if when is None:
  11. raise TypeError("when cannot be None")
  12. self._check_closed()
  13. # 创建一个timer handle,然后添加到事件循环的_scheduled中,等待被调用
  14. timer = events.TimerHandle(when, callback, args, self, context)
  15. heapq.heappush(self._scheduled, timer)
  16. timer._scheduled = True
  17. return timer

2.EventLoop的调度实现

在文章《Python Asyncio中Coroutines,Tasks,Future可等待对象的关系及作用》中已经分析到了runner会通过loop.run_until_complete来调用mainTask从而开启EventLoop的调度,所以在分析EventLoop的调度时,应该先从loop.run_until_complete入手,

对应的源码如下:

  1. class BaseEventLoop:
  2. def run_until_complete(self, future):
  3. ...
  4. new_task = not futures.isfuture(future)
  5. # 把coroutine转换成task,这样事件循环就可以调度了,事件循环的最小调度单位为task
  6. # 需要注意的是此时事件循环并没注册到全局变量中,所以需要显示的传进去,
  7. # 同时Task对象注册的时候,已经通过loop.call_soon把自己注册到事件循环中,等待调度
  8. future = tasks.ensure_future(future, loop=self)
  9. if new_task:
  10. # An exception is raised if the future didn't complete, so there
  11. # is no need to log the "destroy pending task" message
  12. future._log_destroy_pending = False
  13. # 当该task完成时,意味着当前事件循环失去了调度对象,无法继续调度,所以需要关闭当前事件循环,程序会由协程模式返回到线程模式
  14. future.add_done_callback(_run_until_complete_cb)
  15. try:
  16. # 事件循环开始运行
  17. self.run_forever()
  18. except:
  19. if new_task and future.done() and not future.cancelled():
  20. # The coroutine raised a BaseException. Consume the exception
  21. # to not log a warning, the caller doesn't have access to the
  22. # local task.
  23. future.exception()
  24. raise
  25. finally:
  26. future.remove_done_callback(_run_until_complete_cb)
  27. if not future.done():
  28. raise RuntimeError('Event loop stopped before Future completed.')
  29.  
  30. return future.result()
  31.  
  32. def run_forever(self):
  33. # 进行一些初始化工作
  34. self._check_closed()
  35. self._check_running()
  36. self._set_coroutine_origin_tracking(self._debug)
  37. self._thread_id = threading.get_ident()
  38.  
  39. old_agen_hooks = sys.get_asyncgen_hooks()
  40. # 通过asyncgen钩子来自动关闭asyncgen函数,这样可以提醒用户生成器还未关闭
  41. sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook,
  42. finalizer=self._asyncgen_finalizer_hook)
  43. try:
  44. # 设置当前在运行的事件循环到全局变量中,这样就可以在任一阶段获取到当前的事件循环了
  45. events._set_running_loop(self)
  46. while True:
  47. # 正真执行任务的逻辑
  48. self._run_once()
  49. if self._stopping:
  50. break
  51. finally:
  52. # 关闭循环, 并且清理一些资源
  53. self._stopping = False
  54. self._thread_id = None
  55. events._set_running_loop(None)
  56. self._set_coroutine_origin_tracking(False)
  57. sys.set_asyncgen_hooks(*old_agen_hooks)

这段源码并不复杂,它的主要逻辑是通过把Corotinue转为一个Task对象,然后通过Task对象初始化时调用loop.call_sonn方法把自己注册到EventLoop中,最后再通过loop.run_forever中的循环代码一直运行着,直到_stopping被标记为True:

  1. while True:
  2. # 正真执行任务的逻辑
  3. self._run_once()
  4. if self._stopping:
  5. break

可以看出,这段代码是确保事件循环能一直执行着,自动循环结束,而真正调度的核心是_run_once函数,

它的源码如下:

  1. class BaseEventLoop:
  2. ...
  3. def _run_once(self):
  4. # self._scheduled是一个列表,它只存放TimerHandle
  5. sched_count = len(self._scheduled)
  6. ###############################
  7. # 第一阶段,整理self._scheduled #
  8. ###############################
  9. if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and
  10. self._timer_cancelled_count / sched_count > _MIN_CANCELLED_TIMER_HANDLES_FRACTION):
  11. # 当待调度的任务数量超过100且待取消的任务占总任务的50%时,才进入这个逻辑
  12. # 把需要取消的任务移除
  13. new_scheduled = []
  14. for handle in self._scheduled:
  15. if handle._cancelled:
  16. # 设置handle的_cancelled为True,并且把handle从_scheduled中移除
  17. handle._scheduled = False
  18. else:
  19. new_scheduled.append(handle)
  20.  
  21. # 重新排列堆
  22. heapq.heapify(new_scheduled)
  23. self._scheduled = new_scheduled
  24. self._timer_cancelled_count = 0
  25. else:
  26. # 需要取消的handle不多,则只会走这个逻辑,这里会把堆顶的handle弹出,并标记为不可调度,但不会访问整个堆
  27. while self._scheduled and self._scheduled[0]._cancelled:
  28. self._timer_cancelled_count -= 1
  29. handle = heapq.heappop(self._scheduled)
  30. handle._scheduled = False
  31.  
  32. #################################
  33. # 第二阶段,计算超时值以及等待事件IO #
  34. #################################
  35. timeout = None
  36. # 当有准备调度的handle或者是正在关闭时,不等待,方便尽快的调度
  37. if self._ready or self._stopping:
  38. timeout = 0
  39. elif self._scheduled:
  40. # Compute the desired timeout.
  41. # 如果堆有数据时,通过堆顶的handle计算最短的超时时间,但是最多不能超过MAXIMUM_SELECT_TIMEOUT,以免超过系统限制
  42. when = self._scheduled[0]._when
  43. timeout = min(max(0, when - self.time()), MAXIMUM_SELECT_TIMEOUT)
  44.  
  45. # 事件循环等待事件,直到有事件或者超时
  46. event_list = self._selector.select(timeout)
  47.  
  48. ##################################################
  49. # 第三阶段,把满足条件的TimeHandle放入到self._ready中 #
  50. ##################################################
  51. # 获取得到的事件的回调,然后装填到_ready
  52. self._process_events(event_list)
  53.  
  54. # 把一些在self._scheduled且满足调度条件的handle放到_ready中,比如TimerHandle。
  55. # end_time为当前时间+一个时间单位,猜测是能多处理一些这段时间内产生的事件
  56. end_time = self.time() + self._clock_resolution
  57. while self._scheduled:
  58. handle = self._scheduled[0]
  59. if handle._when >= end_time:
  60. break
  61. handle = heapq.heappop(self._scheduled)
  62. handle._scheduled = False
  63. self._ready.append(handle)
  64.  
  65. ################################################################################
  66. # 第四阶段,遍历所有准备调度的handle,并且通过handle的context来执行handle对应的callback #
  67. ################################################################################
  68. ntodo = len(self._ready)
  69. for i in range(ntodo):
  70. handle = self._ready.popleft()
  71. # 如果handle已经被取消,则不调用
  72. if handle._cancelled:
  73. continue
  74. if self._debug:
  75. try:
  76. self._current_handle = handle
  77. t0 = self.time()
  78. handle._run()
  79. dt = self.time() - t0
  80. if dt >= self.slow_callback_duration:
  81. # 执行太久的回调,记录下来,这些需要开发者自己优化
  82. logger.warning('Executing %s took %.3f seconds',
  83. _format_handle(handle), dt)
  84. finally:
  85. self._current_handle = None
  86. else:
  87. handle._run()
  88. handle = None # Needed to break cycles when an exception occurs.

通过源码分析,可以很明确的知道调度逻辑中第一步是先规整self._scheduled,在规整的过程是使用堆排序来进行的,因为堆排序在调度的场景下效率是非常高的,不过这段规整代码分成两种,我猜测是当需要取消的数量过多时直接遍历的效率会更高。 在规整self._scheduled后,就进入第二步,该步骤开始等待系统事件循环返回对应的事件,如果self._ready中有数据,就不做等待了,需要马上到下一步骤,以便能赶紧安排调度。 在得到系统事件循环得到的事件后,就进入到了第三步,该步骤会通过self._process_events方法处理对应的事件,并把事件对应的回调存放到了self._ready中,最后再遍历self._ready中的所有Handle并逐一执行(执行时可以认为EventLoop把控制权返回给对应的调用逻辑),至此一个完整的调度逻辑就结束了,并进入下一个调度逻辑。

3.网络IO事件的处理

注:由于系统事件循环的限制,所以文件IO一般还是使用多线程来执行,具体见:github.com/python/asyn…

在分析EventLoop调度实现的时候忽略了self._process_events的具体实现逻辑,因为_process_events方法所在asyncio.base_event.py文件中的BaseEventLoop类并未有具体实现的,因为网络IO相关的需要系统的事件循环来帮忙处理,所以与系统事件循环相关的逻辑都在asyncio.selector_events.py中的BaseSelectorEventLoop类中。BaseSelectorEventLoop类封装了selector模块与系统事件循环交互,使调用者不需要去考虑sock的创建以及sock产生的文件描述符的监听与注销等操作,下面以BaseSelectorEventLoop中自带的pipe为例子,分析BaseSelectorEventLoop是如何进行网络IO事件处理的。

在分析之前,先看一个例子,代码如下:

  1. import asyncio
  2. import threading
  3. def task():
  4. print("task")
  5. def run_loop_inside_thread(loop):
  6. loop.run_forever()
  7. loop = asyncio.get_event_loop()
  8. threading.Thread(target=run_loop_inside_thread, args=(loop,)).start()
  9. loop.call_soon(task)

如果直接运行这个例子,它并不会输出task(不过在IDE使用DEBUG模式下线程启动会慢一点,所以会输出的),因为在调用loop.run_foreverEventLoop会一直卡在这段逻辑中:

  1. event_list = self._selector.select(timeout)

所以调用loop.call_soon并不会使EventLoop马上安排调度,而如果把call_soon换成call_soon_threadsafe则可以正常输出,这是因为call_soon_threadsafe中多了一个self._write_to_self的调用,它的源码如下:

  1. class BaseEventLoop:
  2. ...
  3. def call_soon_threadsafe(self, callback, *args, context=None):
  4. """Like call_soon(), but thread-safe."""
  5. self._check_closed()
  6. handle = self._call_soon(callback, args, context)
  7. self._write_to_self()
  8. return handle

由于这个调用是涉及到IO相关的,所以需要到BaseSelectorEventLoop类查看,接下来以pipe相关的网络IO操作来分析EventLoop是如何处理IO事件的(只演示reader对象,writer对象操作与reader类似),

对应的源码如下:

  1. class BaseSelectorEventLoop(base_events.BaseEventLoop):
  2. #######
  3. # 创建 #
  4. #######
  5. def __init__(self, selector=None):
  6. super().__init__()
  7.  
  8. if selector is None:
  9. # 获取最优的selector
  10. selector = selectors.DefaultSelector()
  11. self._selector = selector
  12. # 创建pipe
  13. self._make_self_pipe()
  14. self._transports = weakref.WeakValueDictionary()
  15. def _make_self_pipe(self):
  16. # 创建Pipe对应的sock
  17. self._ssock, self._csock = socket.socketpair()
  18. # 设置sock为非阻塞
  19. self._ssock.setblocking(False)
  20. self._csock.setblocking(False)
  21. self._internal_fds += 1
  22. # 阻塞服务端sock读事件对应的回调
  23. self._add_reader(self._ssock.fileno(), self._read_from_self)
  24. def _add_reader(self, fd, callback, *args):
  25. # 检查事件循环是否关闭
  26. self._check_closed()
  27. # 封装回调为handle对象
  28. handle = events.Handle(callback, args, self, None)
  29. try:
  30. key = self._selector.get_key(fd)
  31. except KeyError:
  32. # 如果没有注册到系统的事件循环,则注册
  33. self._selector.register(fd, selectors.EVENT_READ,
  34. (handle, None))
  35. else:
  36. # 如果已经注册过,则更新
  37. mask, (reader, writer) = key.events, key.data
  38. self._selector.modify(fd, mask | selectors.EVENT_READ,
  39. (handle, writer))
  40. if reader is not None:
  41. reader.cancel()
  42. return handle
  43.  
  44. def _read_from_self(self):
  45. # 负责消费sock数据
  46. while True:
  47. try:
  48. data = self._ssock.recv(4096)
  49. if not data:
  50. break
  51. self._process_self_data(data)
  52. except InterruptedError:
  53. continue
  54. except BlockingIOError:
  55. break
  56. #######
  57. # 删除 #
  58. #######
  59. def _close_self_pipe(self):
  60. # 注销Pipe对应的描述符
  61. self._remove_reader(self._ssock.fileno())
  62. # 关闭sock
  63. self._ssock.close()
  64. self._ssock = None
  65. self._csock.close()
  66. self._csock = None
  67. self._internal_fds -= 1
  68.  
  69. def _remove_reader(self, fd):
  70. # 如果事件循环已经关闭了,就不用操作了
  71. if self.is_closed():
  72. return False
  73. try:
  74. # 查询文件描述符是否在selector中
  75. key = self._selector.get_key(fd)
  76. except KeyError:
  77. # 不存在则返回
  78. return False
  79. else:
  80. # 存在则进入移除的工作
  81. mask, (reader, writer) = key.events, key.data
  82. # 通过事件掩码判断是否有其它事件
  83. mask &= ~selectors.EVENT_READ
  84. if not mask:
  85. # 移除已经注册到selector的文件描述符
  86. self._selector.unregister(fd)
  87. else:
  88. # 移除已经注册到selector的文件描述符,并注册新的事件
  89. self._selector.modify(fd, mask, (None, writer))
  90.  
  91. # 如果reader不为空,则取消reader
  92. if reader is not None:
  93. reader.cancel()
  94. return True
  95. else:
  96. return False

通过源码中的创建部分可以看到,EventLoop在启动的时候会创建一对建立通信的sock,并设置为非阻塞,然后把对应的回调封装成一个Handle对象并注册到系统事件循环中(删除则进行对应的反向操作),之后系统事件循环就会一直监听对应的事件,也就是EventLoop的执行逻辑会阻塞在下面的调用中,等待事件响应:

  1. event_list = self._selector.select(timeout)

这时如果执行loop.call_soon_threadsafe,那么会通过write_to_self写入一点信息:

  1. def _write_to_self(self):
  2. csock = self._csock
  3. if csock is None:
  4. return
  5. try:
  6. csock.send(b'\0')
  7. except OSError:
  8. if self._debug:
  9. logger.debug("Fail to write a null byte into the self-pipe socket", exc_info=True)

由于csock被写入了数据,那么它对应的ssock就会收到一个读事件,系统事件循环在收到这个事件通知后就会把数据返回,然后EventLoop就会获得到对应的数据,并交给process_events方法进行处理,

它的相关代码如下:

  1. class BaseSelectorEventLoop:
  2. def _process_events(self, event_list):
  3. for key, mask in event_list:
  4. # 从回调事件中获取到对应的数据,key.data在注册时是一个元祖,所以这里要对元祖进行解包
  5. fileobj, (reader, writer) = key.fileobj, key.data
  6. if mask & selectors.EVENT_READ and reader is not None:
  7. # 得到reader handle,如果是被标记为取消,就移除对应的文件描述符
  8. if reader._cancelled:
  9. self._remove_reader(fileobj)
  10. else:
  11. # 如果没被标记为取消,则安排到self._ready中
  12. self._add_callback(reader)
  13. if mask & selectors.EVENT_WRITE and writer is not None:
  14. # 对于写对象,也是同样的道理。
  15. if writer._cancelled:
  16. self._remove_writer(fileobj)
  17. else:
  18. self._add_callback(writer)
  19.  
  20. def _add_callback(self, handle):
  21. # 把回调的handle添加到_ready中
  22. assert isinstance(handle, events.Handle), 'A Handle is required here'
  23. if handle._cancelled:
  24. return
  25. assert not isinstance(handle, events.TimerHandle)
  26. self._ready.append(handle)
  27.  
  28. def _remove_reader(self, fd):
  29. # 如果事件循环已经关闭了,就不用操作了
  30. if self.is_closed():
  31. return False
  32. try:
  33. # 查询文件描述符是否在selector中
  34. key = self._selector.get_key(fd)
  35. except KeyError:
  36. # 不存在则返回
  37. return False
  38. else:
  39. # 存在则进入移除的工作
  40. mask, (reader, writer) = key.events, key.data
  41. mask &= ~selectors.EVENT_READ
  42. if not mask:
  43. # 移除已经注册到selector的文件描述符
  44. self._selector.unregister(fd)
  45. else:
  46. self._selector.modify(fd, mask, (None, writer))
  47.  
  48. if reader is not None:
  49. reader.cancel()
  50. return True
  51. else:
  52. return False

从代码中可以看出_process_events会对事件对应的文件描述符进行处理,并从事件回调中获取到对应的Handle对象添加到self._ready中,由EventLoop在接下来遍历self._ready并执行。

可以看到网络IO事件的处理并不复杂,因为系统事件循环已经为我们做了很多工作了,但是用户所有与网络IO相关的操作都需要有一个类似的操作,这样是非常的繁琐的,幸好asyncio库已经为我们做了封装,我们只要调用就可以了,方便了很多。

到此这篇关于Python Asyncio调度原理详情的文章就介绍到这了,更多相关Python Asyncio 内容请搜索w3xue以前的文章或继续浏览下面的相关文章希望大家以后多多支持w3xue!

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号