β

Python 的 concurrent.futures 理解

nosa.me 232 阅读

这两天读了 concurrent.futures 源码,记录一下实现原理。

concurrent.futures 的本质是开若干个执行器(线程或进程)来执行 fucntion,fuction 的结果会被设置到 function 对应的 future 中,function 是否执行完通过 threading 事件解决。

对于执行器是线程的情况,是否执行完的实现大概如下:

可以对若干个 future ( fs ) 建立 一个 waiter,并赋给每一个 future,由于每当有 function 执行完之后会把其 future 加入到 waiter 的 finished_futures 中,所以只要检测 finished_futures 就可以,由于 waiter 有 threading 事件,当 future 加入到 finished_futures 的时候,可以 set threading 事件,在等待 finished_futures 的事件就会触发,拿到结果。

而对于进程,Executor 中有个 map() 函数,它返回一个生成器,生成器会依次返回每个 function 的执行结果。

concurrent.futures 主要包括三个文件:_base.py、thread.py 和 process.py,_base.py 主要是 Future 本身的内容,thread.py 和 process.py 是 Future 的执行器。

_base.py 主要定了三部分内容:

1). waiter 类。

class _Waiter(object):
class _AsCompletedWaiter(_Waiter):
class _FirstCompletedWaiter(_Waiter):
class _AllCompletedWaiter(_Waiter):

_Waiter 类用来等待 Future 执行完,_Waiter 里定义了 threading.Event(),_AsCompletedWaiter 每个 Future 完成都会触发 event.set(),_FirstCompletedWaiter 每个 Future 完成也会触发,_AllCompletedWaiter 会等所有 Future 完成才触发 event.set()。

另外,_AsCompletedWaiter 和 _AllCompletedWaiter 还有把锁 threading.Lock()。

2). 辅助函数。

def _create_and_install_waiters(fs, return_when):
def as_completed(fs, timeout=None):
def wait(fs, timeout=None, return_when=ALL_COMPLETED):

_create_and_install_waiters 是对 Future 列表 fs 创建和安装 waiter,创建好响应的 waiter 之后,会对 fs 中的每一个 Future 增加此 waiter (Future 有个列表变量 _waiters,加入即可),并且返回此 waiter;

as_completed 是一个生成器,配合 for 使用可以循环得到已经完成的 Future,as_completed 使用了 _create_and_install_waiters;

wait 用于等待 Future 列表依次完成。

3). Future 类和 Executor 类。

Future 类的成员变量:

self._condition = threading.Condition()
self._state = PENDING
self._result = None
self._exception = None
self._traceback = None
self._waiters = []
self._done_callbacks = []

_condition 用于控制 Future 内部的条件,比如 result() 要得到值,如果没有完成就要_condition.wait,直到 set_result() 触发 _condition.notify_all(),当然,cancel() 也可以触发 _condition.notify_all()。

Future 支持 callback,记录在 _done_callbacks,Future 完成后会执行这些 callback。

Executor 类供继承,需要实现 submit 方法,thread.py 中的 ThreadPoolExecutor 和 process.py 中的 ProcessPoolExecutor 都实现了此类。

thread.py 主要包括三部分:

class _WorkItem(object):
def _worker(executor_reference, work_queue):
class ThreadPoolExecutor(_base.Executor):

_WorkItem 是 Future 的包装,变量有 self.future、self.fn (执行的函数)、self.args 和 self.kwargs,里面的 run() 函数用来执行 Future 并设置 Future 信息,_state 会被设置成 FINISHED,如果是无异常会设置 _result,否则设置 _exception 和 _traceback。

_worker 不断从 _work_queue 队列中取 Future 并执行。

ThreadPoolExecutor 实现 _base.Executor,主要变量是 _max_workers  和 _work_queue,_max_workers 是最大线程数,_work_queue 是 queue.Queue(),当调用 submit 的时候会把 Future 包装成 _WorkItem,放入 _work_queue,然后开启最多 _max_workers 的线程去执行 _worker (不断读取队列并执行 )。

process.py  的实现和 thread.py 类似,只是开启进程来执行,下面是大概的逻辑:

92F6AD79-071D-4B1A-9CB7-3671F6ABC1B5

细节就不写了。

参考:

https://docs.python.org/3/library/concurrent.futures.html

Related posts:

  1. Python 程序的发布流程
  2. 关于 Python 线程
  3. python 的 __all__ 测试
  4. Python getattr 的用处
作者:nosa.me
未来不会有sa
原文地址:Python 的 concurrent.futures 理解, 感谢原作者分享。

发表评论