Python 期物用在异步编程,所谓期物指的是排定的执行事件。Python 3.4起
-
总结
- 1、期物处理并发只涉及到三个对象,一个是期物(
concurrent.futures.Future
),一个是执行器(concurrent.futures.Executor
),还有一个是_WorkItem
类- 1)期物对象:本身不涉及多线程,多进程或者
yield
等语法,其只是一个具有运行状态
和运行结果
以及可添加回调函数
的类 - 2)
_WorkItem
对象:真正被添加到任务队列的对象。将一个需执行的函数
,期物
实例化成一个 _WorkItem 对象。通过run
方法执行,run
方法负责标记期物的状态,执行函数,将执行结果赋值给期物。 - 3)执行器:有两个方法
map
,submit
- submit 接收一个
函数
,期物
,生成 _WorkItem 对象,并将该对象添加到任务队列
中。每调用 submit 方法都会调整处理队列的线程个数,如果当前运行线程数小于执行器设置的最大执行线程数,则新建一个线程去处理任务队列。返回值为期物对象 - map 方法,使用
submit
迭代执行器要执行函数的参数列表,返回一个期物列表。遍历期物列表,使用yield
去接收每个期物对象的result
属性。
- submit 接收一个
- 1)期物对象:本身不涉及多线程,多进程或者
- 2、任务队列的运行:
- 1)每个线程均执行
_worker()
方法 - 2)任务队列
work_queue
使用queue.Queue()
存储
- 1)每个线程均执行
- 1、期物处理并发只涉及到三个对象,一个是期物(
循环执行 work_queue.get 得到的 _WorkItem 对象,直到获取的对象为 None
-
Future 源码
class Future(object):
# 表征了异步计算的结果
def __init__(self):
# 初始化 future 实例,不应该通过用户端调用
self._condition = threading.Condition() # condition是条件锁
self._state = PENDING
self._result = None
self._exception = None
self._waiters = []
self._done_callbacks = []
# 回调
def _invoke_callbacks(self):
for callback in self._done_callbacks:
try:
callback(self)
except Exception:
LOGGER.exception('exception calling callback for %r', self)
# 格式化输出对象
def __repr__(self):
with self._condition:
if self._state == FINISHED:
if self._exception:
return '<%s at %#x state=%s raised %s>' % (
self.__class__.__name__,
id(self),
_STATE_TO_DESCRIPTION_MAP[self._state],
self._exception.__class__.__name__)
else:
return '<%s at %#x state=%s returned %s>' % (
self.__class__.__name__,
id(self),
_STATE_TO_DESCRIPTION_MAP[self._state],
self._result.__class__.__name__)
return '<%s at %#x state=%s>' % (
self.__class__.__name__,
id(self),
_STATE_TO_DESCRIPTION_MAP[self._state])
def cancel(self):
# 取消期物的调用,取消成功返回 Ture,其余返回 False。
# 如果期物已经运行或者已经结束,则该期物不可以被取消,返回 True。
with self._condition:
if self._state in [RUNNING, FINISHED]:
return False
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
return True
self._state = CANCELLED
# 唤醒所有使用 _condition 条件阻塞的线程
self._condition.notify_all()
# 执行任务结束或cancel的回调
self._invoke_callbacks()
return True
def cancelled(self):
# 如果 future 已被 cancel,返回 True
with self._condition:
return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]
def running(self):
# 如果 future 正在运行,返回 True
with self._condition:
return self._state == RUNNING
def done(self):
# 如果 future 已被 cancel 或者 执行结束,返回 True
with self._condition:
return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]
# 返回期物运行结果
def __get_result(self):
if self._exception:
raise self._exception
else:
return self._result
def add_done_callback(self, fn):
# 期物运行结束调用的对象
# fn: 期物运行结束或 cancel 后被调用,总会在所添加的进程内调用。如果期物已经结束或 cancel 则会立即调用;根据添加顺序进行调用
with self._condition:
if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]:
self._done_callbacks.append(fn)
return
fn(self)
def result(self, timeout=None):
"""
Returns:
期物的运行结果
Raises:
CanceledError: 期物被 cancell
TimeoutError: 期物在给定的时间没有执行完毕
Exception: 其他 Error
"""
with self._condition:
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
raise CancelledError()
elif self._state == FINISHED:
return self.__get_result()
# 此处会阻塞,等待 notify
self._condition.wait(timeout)
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
raise CancelledError()
elif self._state == FINISHED:
return self.__get_result()
else:
raise TimeoutError()
def exception(self, timeout=None):
"""
Returns:
期物运行的异常
Raises:
CancelledError: 如果期物被 cancel
TimeoutError: 如果期物在给定的时间没有执行完毕
"""
with self._condition:
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
raise CancelledError()
elif self._state == FINISHED:
return self._exception
self._condition.wait(timeout)
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
raise CancelledError()
elif self._state == FINISHED:
return self._exception
else:
raise TimeoutError()
# The following methods should only be used by Executors and in tests.
def set_running_or_notify_cancel(self):
"""
标记期物为 RUNNING 或者 CANCELLED_AND_NOTIFIED,
1、如果期物已经 cancelled 则期物任何等待执行的线程都会被 notify 并且 return False。
2、如果期物没有被 cancelled,则状态变更为 RUNNING,返回 True
3、此方法应该在期物所关联的work执行前被调用,如果此方法返回 False,那么 work 不应该被执行。
Returns:
如果期物已经被 cancelled,返回 False;其他情况返回 True
Raises:
RuntimeError:如果此方法已经被调用或者 set_result() 或者 set_exception()被调用。
"""
with self._condition:
if self._state == CANCELLED:
self._state = CANCELLED_AND_NOTIFIED
for waiter in self._waiters:
waiter.add_cancelled(self)
# self._condition.notify_all() is not necessary because
# self.cancel() triggers a notification.
return False
elif self._state == PENDING:
self._state = RUNNING
return True
else:
LOGGER.critical('Future %s in unexpected state: %s',
id(self),
self._state)
raise RuntimeError('Future in unexpected state')
def set_result(self, result):
"""Sets the return value of work associated with the future.
Should only be used by Executor implementations and unit tests.
"""
"""
将期物关联 work 的返回值赋值给期物对象,并发送通知 notify
"""
with self._condition:
self._result = result
self._state = FINISHED
for waiter in self._waiters:
waiter.add_result(self)
self._condition.notify_all()
self._invoke_callbacks()
def set_exception(self, exception):
"""
使用给定的异常设置期物的 _exception 值
"""
with self._condition:
self._exception = exception
self._state = FINISHED
for waiter in self._waiters:
waiter.add_exception(self)
self._condition.notify_all()
self._invoke_callbacks()
-
单从 Future 类并无法获知期物何时运行,下面引入 ThreadPoolExecutor
class ThreadPoolExecutor(_base.Executor):
def __init__(self, max_workers=None):
"""
初始化一个 ThreadPoolExecutor 实例
Args: max_workers 使用最大线程数
"""
if max_workers is None:
# Use this number because ThreadPoolExecutor is often
# used to overlap I/O instead of CPU work.
max_workers = (os.cpu_count() or 1) * 5
if max_workers <= 0:
raise ValueError("max_workers must be greater than 0")
self._max_workers = max_workers
self._work_queue = queue.Queue() # _WorkItem 实例队列
self._threads = set() # 实例的线程数
self._shutdown = False # 设置为 True 不再接受事件提交
self._shutdown_lock = threading.Lock() # 锁
# 事件提交
def submit(self, fn, *args, **kwargs):
with self._shutdown_lock:
if self._shutdown:
raise RuntimeError('cannot schedule new futures after shutdown')
f = _base.Future()
w = _WorkItem(f, fn, args, kwargs) # 用以在线程中调用其 run 方法
self._work_queue.put(w)
self._adjust_thread_count() # 用以开启最多 _max_workers 数量的线程,并且在每个线程中 while 循环执行 _work_queue 队列中的实例
return f # 返回期物
submit.__doc__ = _base.Executor.submit.__doc__
def _adjust_thread_count(self):
# 用以唤醒 worker 线程
def weakref_cb(_, q=self._work_queue):
q.put(None)
# TODO(bquinlan): Should avoid creating new threads if there are more
# idle threads than items in the work queue.
if len(self._threads) < self._max_workers:
t = threading.Thread(target=_worker,
args=(weakref.ref(self, weakref_cb),
self._work_queue))
t.daemon = True
t.start()
self._threads.add(t)
_threads_queues[t] = self._work_queue
def shutdown(self, wait=True):
with self._shutdown_lock:
self._shutdown = True
self._work_queue.put(None)
if wait:
for t in self._threads:
t.join()
shutdown.__doc__ = _base.Executor.shutdown.__doc__
-
Executor
class Executor(object):
# 异步调用的抽象基类
def submit(self, fn, *args, **kwargs):
raise NotImplementedError()
def map(self, fn, *iterables, timeout=None, chunksize=1):
"""
Returns:
迭代器,等同于 map(fn, *iteravles) 但是不是按照顺序执行
Args:
fn: 可调用对象,参数在 iterable 对象中
timeout: 最大等待时间
Raises:
TimeoutError: 所有的迭代器不能在给定的时间生成
Exception: 任何其他异常错误
"""
if timeout is not None:
end_time = timeout + time.time()
# submit 的作用是将 函数+期物 绑定生成_WorkItem 实例对象,并且创建线程去循环执行 _WorkItem 对象实例
fs = [self.submit(fn, *args) for args in zip(*iterables)]
def result_iterator():
try:
for future in fs:
if timeout is None:
yield future.result()
else:
yield future.result(end_time - time.time())
finally:
for future in fs:
future.cancel()
return result_iterator()
def shutdown(self, wait=True):
# 清理所有关联 executor 对象的资源
pass
def __enter__(self):
# return 的 self 是给 as 使用的
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.shutdown(wait=True)
return False
-
_WorkItem
# 简单的工作类
class _WorkItem(object):
# 初始化,参数为 期物+函数+参数
def __init__(self, future, fn, args, kwargs):
self.future = future
self.fn = fn
self.args = args
self.kwargs = kwargs
# 标记期物为notify,非 True 直接返回。调用期物关联的fn方法。
def run(self):
if not self.future.set_running_or_notify_cancel():
return
try:
result = self.fn(*self.args, **self.kwargs)
except BaseException as e:
self.future.set_exception(e)
else:
self.future.set_result(result)
-
_worker()
# _worker方法
def _worker(executor_reference, work_queue):
"""
此方法在被调用的线程内 while True 执行
"""
try:
while True:
work_item = work_queue.get(block=True)
if work_item is not None:
work_item.run()
# Delete references to object. See issue16284
del work_item
continue
executor = executor_reference()
# Exit if:
# - The interpreter is shutting down OR
# - The executor that owns the worker has been collected OR
# - The executor that owns the worker has been shutdown.
"""
1、编译器是否关闭
2、executor 是否被回收
3、executor._shutdown 被设置
"""
if _shutdown or executor is None or executor._shutdown:
# 通知其他线程的 worker
work_queue.put(None)
return
del executor
except BaseException:
_base.LOGGER.critical('Exception in worker', exc_info=True)
备注
期物的使用标准流程
with futures.ThreadPoolExecutor(10) as executor:
res = executor.map(func, para_data_list) # func 是要调用的函数,para_data_list 是参数 list
- 分析
-
ThreadPoolExecutor
和ProcessPoolExecutor
均继承自concurrent.futures.Executor
,因其实现了__enter__
,__exit__
方法,故可以使用with
语法 - 使用
.map()
方法会调用.submit()
方法; -
.submit()
方法:- 将函数
func
+future
绑定生成_WorkItem
实例对象w
, - 将
w
添加到队列_work_queue
- 并且创建线程执行
_worker
方法(此处的创建线程是指会最多创建如上例10
个线程去执行);
- 将函数
-
_worker()
方法:- 从
_work_queue
队列中获取_WorkItem
对象w
,执行其w.run()
方法
- 从
- 返回值
res
是生成器,使用迭代获取函数返回的值; -
future.result()
会阻塞调用。