Python 对ThreadPoolExecutor的简单封装

from concurrent.futures import ThreadPoolExecutor
from threading import Lock

class MRTaskManager(object):
    def __init__(self, max_running, max_waiting):
        self._lock = Lock()
        self._num = 0
        self._max_waiting = max_waiting
        self._max_running = max_running
        self._pool = ThreadPoolExecutor(max_workers=max_running)

    def add(self, job):
        if self._num >= self._max_waiting:
            return
        try:
            self._lock.acquire()
            if self._num >= self._max_waiting:
                logger.info('job pool already reach max job count, abort job {}'.format(job))
                return
            self._num += 1
            self._pool.submit(job.run).add_done_callback(self.callback)
            logger.info('submit async mapreduce task, {}'.format(job))
        except Exception as e:
            logger.error("task register failed, err msg = {}".format(e.message))
        finally:
            self._lock.release()

    def callback(self, _):
        self._lock.acquire()
        self._num -= 1
        self._lock.release()

当线程池中任务超过max_waiting的时候,可以丢弃掉任务,防止在锁上排队的任务太多而挤爆内存?

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容