Python 期物之 concurrent.futures.Future

Python 期物用在异步编程,所谓期物指的是排定的执行事件。Python 3.4起

  • 总结
    • 1、期物处理并发只涉及到三个对象,一个是期物(concurrent.futures.Future),一个是执行器(concurrent.futures.Executor),还有一个是 _WorkItem
      • 1)期物对象:本身不涉及多线程,多进程或者yield等语法,其只是一个具有运行状态运行结果以及可添加回调函数的类
      • 2)_WorkItem 对象:真正被添加到任务队列的对象。将一个需执行的函数期物实例化成一个 _WorkItem 对象。通过 run 方法执行,run 方法负责标记期物的状态,执行函数,将执行结果赋值给期物。
      • 3)执行器:有两个方法 map, submit
        • submit 接收一个函数期物,生成 _WorkItem 对象,并将该对象添加到任务队列中。每调用 submit 方法都会调整处理队列的线程个数,如果当前运行线程数小于执行器设置的最大执行线程数,则新建一个线程去处理任务队列。返回值为期物对象
        • map 方法,使用 submit 迭代执行器要执行函数的参数列表,返回一个期物列表。遍历期物列表,使用 yield 去接收每个期物对象的result属性。
    • 2、任务队列的运行:
      • 1)每个线程均执行 _worker()方法
      • 2)任务队列 work_queue 使用 queue.Queue() 存储

循环执行 work_queue.get 得到的 _WorkItem 对象,直到获取的对象为 None

  • Future 源码
class Future(object):
    # 表征了异步计算的结果

    def __init__(self):
        # 初始化 future 实例,不应该通过用户端调用
        self._condition = threading.Condition()  # condition是条件锁
        self._state = PENDING
        self._result = None
        self._exception = None
        self._waiters = []
        self._done_callbacks = []

    # 回调
    def _invoke_callbacks(self):
        for callback in self._done_callbacks:
            try:
                callback(self)
            except Exception:
                LOGGER.exception('exception calling callback for %r', self)

    # 格式化输出对象
    def __repr__(self):
        with self._condition:
            if self._state == FINISHED:
                if self._exception:
                    return '<%s at %#x state=%s raised %s>' % (
                        self.__class__.__name__,
                        id(self),
                        _STATE_TO_DESCRIPTION_MAP[self._state],
                        self._exception.__class__.__name__)
                else:
                    return '<%s at %#x state=%s returned %s>' % (
                        self.__class__.__name__,
                        id(self),
                        _STATE_TO_DESCRIPTION_MAP[self._state],
                        self._result.__class__.__name__)
            return '<%s at %#x state=%s>' % (
                    self.__class__.__name__,
                    id(self),
                   _STATE_TO_DESCRIPTION_MAP[self._state])

    def cancel(self):
        # 取消期物的调用,取消成功返回 Ture,其余返回 False。
        # 如果期物已经运行或者已经结束,则该期物不可以被取消,返回 True。
        
        with self._condition:
            if self._state in [RUNNING, FINISHED]:
                return False

            if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
                return True

            self._state = CANCELLED
            # 唤醒所有使用 _condition 条件阻塞的线程
            self._condition.notify_all()
        
        # 执行任务结束或cancel的回调
        self._invoke_callbacks()
        return True

    def cancelled(self):
        # 如果 future 已被 cancel,返回 True
        with self._condition:
            return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]

    def running(self):
        # 如果 future 正在运行,返回 True
        with self._condition:
            return self._state == RUNNING

    def done(self):
        # 如果 future 已被 cancel 或者 执行结束,返回 True
        with self._condition:
            return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]

    # 返回期物运行结果
    def __get_result(self):
        if self._exception:
            raise self._exception
        else:
            return self._result

    def add_done_callback(self, fn):
        # 期物运行结束调用的对象
        # fn: 期物运行结束或 cancel 后被调用,总会在所添加的进程内调用。如果期物已经结束或 cancel 则会立即调用;根据添加顺序进行调用
        with self._condition:
            if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]:
                self._done_callbacks.append(fn)
                return
        fn(self)

    def result(self, timeout=None):
        """
        Returns:
            期物的运行结果
        Raises:
            CanceledError: 期物被 cancell
            TimeoutError: 期物在给定的时间没有执行完毕
            Exception: 其他 Error
        """
        with self._condition:
            if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
                raise CancelledError()
            elif self._state == FINISHED:
                return self.__get_result()

            # 此处会阻塞,等待 notify 
            self._condition.wait(timeout)

            if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
                raise CancelledError()
            elif self._state == FINISHED:
                return self.__get_result()
            else:
                raise TimeoutError()

    def exception(self, timeout=None):
        """
        Returns:
            期物运行的异常
        Raises:
            CancelledError: 如果期物被 cancel
            TimeoutError: 如果期物在给定的时间没有执行完毕
        """
        with self._condition:
            if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
                raise CancelledError()
            elif self._state == FINISHED:
                return self._exception

            self._condition.wait(timeout)

            if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
                raise CancelledError()
            elif self._state == FINISHED:
                return self._exception
            else:
                raise TimeoutError()

    # The following methods should only be used by Executors and in tests.
    def set_running_or_notify_cancel(self):
        """
        标记期物为 RUNNING 或者 CANCELLED_AND_NOTIFIED,
            1、如果期物已经 cancelled 则期物任何等待执行的线程都会被 notify 并且 return False。
            2、如果期物没有被 cancelled,则状态变更为 RUNNING,返回 True
            3、此方法应该在期物所关联的work执行前被调用,如果此方法返回 False,那么 work 不应该被执行。
        Returns:
            如果期物已经被 cancelled,返回 False;其他情况返回 True
        Raises:
            RuntimeError:如果此方法已经被调用或者 set_result() 或者 set_exception()被调用。
        """
        with self._condition:
            if self._state == CANCELLED:
                self._state = CANCELLED_AND_NOTIFIED
                for waiter in self._waiters:
                    waiter.add_cancelled(self)
                # self._condition.notify_all() is not necessary because
                # self.cancel() triggers a notification.
                return False
            elif self._state == PENDING:
                self._state = RUNNING
                return True
            else:
                LOGGER.critical('Future %s in unexpected state: %s',
                                id(self),
                                self._state)
                raise RuntimeError('Future in unexpected state')

    def set_result(self, result):
        """Sets the return value of work associated with the future.

        Should only be used by Executor implementations and unit tests.
        """
        """
        将期物关联 work 的返回值赋值给期物对象,并发送通知 notify
        """
        with self._condition:
            self._result = result
            self._state = FINISHED
            for waiter in self._waiters:
                waiter.add_result(self)
            self._condition.notify_all()
        self._invoke_callbacks()

    def set_exception(self, exception):
        """
        使用给定的异常设置期物的 _exception 值
        """
        with self._condition:
            self._exception = exception
            self._state = FINISHED
            for waiter in self._waiters:
                waiter.add_exception(self)
            self._condition.notify_all()
        self._invoke_callbacks()
  • 单从 Future 类并无法获知期物何时运行,下面引入 ThreadPoolExecutor
class ThreadPoolExecutor(_base.Executor):
    def __init__(self, max_workers=None):
        """
        初始化一个 ThreadPoolExecutor 实例
        Args: max_workers 使用最大线程数
        """
        if max_workers is None:
            # Use this number because ThreadPoolExecutor is often
            # used to overlap I/O instead of CPU work.
            max_workers = (os.cpu_count() or 1) * 5
        if max_workers <= 0:
            raise ValueError("max_workers must be greater than 0")

        self._max_workers = max_workers
        self._work_queue = queue.Queue()  # _WorkItem 实例队列
        self._threads = set()  # 实例的线程数
        self._shutdown = False  # 设置为 True 不再接受事件提交
        self._shutdown_lock = threading.Lock()  # 锁

    # 事件提交
    def submit(self, fn, *args, **kwargs):
        with self._shutdown_lock:
            if self._shutdown:
                raise RuntimeError('cannot schedule new futures after shutdown')

            f = _base.Future()
            w = _WorkItem(f, fn, args, kwargs)  # 用以在线程中调用其 run 方法

            self._work_queue.put(w)
            self._adjust_thread_count()  # 用以开启最多 _max_workers 数量的线程,并且在每个线程中 while 循环执行 _work_queue 队列中的实例
            
            return f  # 返回期物
    submit.__doc__ = _base.Executor.submit.__doc__

    def _adjust_thread_count(self):
        # 用以唤醒 worker 线程
        def weakref_cb(_, q=self._work_queue):
            q.put(None)
        # TODO(bquinlan): Should avoid creating new threads if there are more
        # idle threads than items in the work queue.
        if len(self._threads) < self._max_workers:
            t = threading.Thread(target=_worker,
                                 args=(weakref.ref(self, weakref_cb),
                                       self._work_queue))
            t.daemon = True
            t.start()
            self._threads.add(t)
            _threads_queues[t] = self._work_queue

    def shutdown(self, wait=True):
        with self._shutdown_lock:
            self._shutdown = True
            self._work_queue.put(None)
        if wait:
            for t in self._threads:
                t.join()
    shutdown.__doc__ = _base.Executor.shutdown.__doc__
  • Executor
class Executor(object):
    # 异步调用的抽象基类

    def submit(self, fn, *args, **kwargs):
        raise NotImplementedError()

    def map(self, fn, *iterables, timeout=None, chunksize=1):
        """
        Returns:
            迭代器,等同于 map(fn, *iteravles) 但是不是按照顺序执行
        Args:
            fn: 可调用对象,参数在 iterable 对象中
            timeout: 最大等待时间
        Raises:
            TimeoutError: 所有的迭代器不能在给定的时间生成
            Exception: 任何其他异常错误
        """
        if timeout is not None:
            end_time = timeout + time.time()

        # submit 的作用是将 函数+期物 绑定生成_WorkItem 实例对象,并且创建线程去循环执行 _WorkItem 对象实例
        fs = [self.submit(fn, *args) for args in zip(*iterables)]

        def result_iterator():
            try:
                for future in fs:
                    if timeout is None:
                        yield future.result()
                    else:
                        yield future.result(end_time - time.time())
            finally:
                for future in fs:
                    future.cancel()
        return result_iterator()

    def shutdown(self, wait=True):
        # 清理所有关联 executor 对象的资源
        pass

    def __enter__(self):
        # return 的 self 是给 as 使用的
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.shutdown(wait=True)
        return False
  • _WorkItem
# 简单的工作类 
class _WorkItem(object):

    # 初始化,参数为 期物+函数+参数
    def __init__(self, future, fn, args, kwargs):
        self.future = future
        self.fn = fn
        self.args = args
        self.kwargs = kwargs

    # 标记期物为notify,非 True 直接返回。调用期物关联的fn方法。
    def run(self):
        if not self.future.set_running_or_notify_cancel():
            return

        try:
            result = self.fn(*self.args, **self.kwargs)
        except BaseException as e:
            self.future.set_exception(e)
        else:
            self.future.set_result(result)
  • _worker()
# _worker方法
def _worker(executor_reference, work_queue):
    """
    此方法在被调用的线程内 while True 执行
    """
    try:
        while True:
            work_item = work_queue.get(block=True)
            if work_item is not None:
                work_item.run()
                # Delete references to object. See issue16284
                del work_item
                continue
            executor = executor_reference()
            # Exit if:
            #   - The interpreter is shutting down OR
            #   - The executor that owns the worker has been collected OR
            #   - The executor that owns the worker has been shutdown.
            """
            1、编译器是否关闭
            2、executor 是否被回收
            3、executor._shutdown 被设置
            """
            if _shutdown or executor is None or executor._shutdown:
                # 通知其他线程的 worker
                work_queue.put(None)
                return
            del executor
    except BaseException:
        _base.LOGGER.critical('Exception in worker', exc_info=True)

备注

期物的使用标准流程

with futures.ThreadPoolExecutor(10) as executor:
    res = executor.map(func, para_data_list)  # func 是要调用的函数,para_data_list 是参数 list
  • 分析
  • ThreadPoolExecutorProcessPoolExecutor 均继承自 concurrent.futures.Executor,因其实现了 __enter__, __exit__方法,故可以使用 with语法
  • 使用 .map() 方法会调用 .submit() 方法;
  • .submit() 方法:
    • 将函数func + future绑定生成 _WorkItem 实例对象 w
    • w添加到队列 _work_queue
    • 并且创建线程执行 _worker 方法(此处的创建线程是指会最多创建如上例10个线程去执行);
  • _worker() 方法:
    • _work_queue队列中获取 _WorkItem对象 w,执行其 w.run()方法
  • 返回值 res 是生成器,使用迭代获取函数返回的值;
  • future.result() 会阻塞调用。
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,919评论 6 502
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,567评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,316评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,294评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,318评论 6 390
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,245评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,120评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,964评论 0 275
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,376评论 1 313
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,592评论 2 333
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,764评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,460评论 5 344
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,070评论 3 327
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,697评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,846评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,819评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,665评论 2 354

推荐阅读更多精彩内容