ceph基础库 - threadpool

代码路径

所在代码位置:src/common/WorkQueue.cc

线程池功能描述

线程池主要成员

class ThreadPool : public md_config_obs_t
{ 
    unsigned _num_threads;                           //构造线程池时指定的线程数量
    std::vector<WorkQueue_*> work_queues;  //待处理队列
    std::set<WorkThread*> _threads;              //线程对象集合
    std::list<WorkThread*> _old_threads;   //待join的线程集合
}

线程池函数逻辑

  • start()
void ThreadPool::start_threads()
{
  ceph_assert(ceph_mutex_is_locked(_lock));
  while (_threads.size() < _num_threads) {  //1. 线程数量到达配置线程数量之前,不断创建新线程对象
    WorkThread *wt = new WorkThread(this); 
    ldout(cct, 10) << "start_threads creating and starting " << wt << dendl;
    _threads.insert(wt);                                        //2. 放入到线程池集合中

    wt->create(thread_name.c_str());              //3. 创建pthread线程,并不断运行 WorkThread->pool->worker()
                                                                                  //   _threads中的线程从work_queues中竞争待处理队列
  }
}
  • worker()

线程池核心运行函数

void ThreadPool::worker(WorkThread *wt)
{
  std::unique_lock ul(_lock);
  while (!_stop) {

    // manage dynamic thread pool
    join_old_threads();                                         // 1. join _old_threads集合中的线程
    if (_threads.size() > _num_threads) {
      _threads.erase(wt);
      _old_threads.push_back(wt);
      break;
    }

    if (!_pause && !work_queues.empty()) {
      WorkQueue_* wq;
      int tries = 2 * work_queues.size();
      bool did = false;
      while (tries--) {
        next_work_queue %= work_queues.size();
        wq = work_queues[next_work_queue++];  // 2. 某个线程从work_queues中竞争到一个wq
        
        void *item = wq->_void_dequeue();     // 3. 从当前work_queue中取出一个item处理
        if (item) {
          processing++;
          ldout(cct,12) << "worker wq " << wq->name << " start processing " << item
            << " (" << processing << " active)" << dendl;
          ul.unlock();
                    ...
          wq->_void_process(item, tp_handle); // 4. 调用队列处理函数_void_process,放锁后执行,因此存在并发
          ul.lock();
          wq->_void_process_finish(item);     // 5. 调用队列处理finish函数_void_process_finish,不存在并发
          processing--;
          ldout(cct,15) << "worker wq " << wq->name << " done processing " << item
            << " (" << processing << " active)" << dendl;
          if (_pause || _draining)
            _wait_cond.notify_all();                  // 6. 唤醒等待的线程
          did = true;
          break;
        }
      }
      if (did)
        continue;
    }
  }
}
  • pause(): 暂停线程池处理
  • unpause(): 继续线程池处理
  • drain(): 排空work_queue
  • stop(): join所有线程池中线程, 清空work_queue_队列(不执行,直接放弃)

队列结构

<img src="ThreadPool/image/image-20220524005804411.png" alt="image-20220524005804411" style="zoom:50%;" />

虚基类WorkQueue_

struct WorkQueue_ {
    std::string name;
    time_t timeout_interval, suicide_interval;
    WorkQueue_(std::string n, time_t ti, time_t sti)
      : name(std::move(n)), timeout_interval(ti), suicide_interval(sti)
    { }
    virtual ~WorkQueue_() {}
    /// Remove all work items from the queue.
    virtual void _clear() = 0;
    /// Check whether there is anything to do.
    virtual bool _empty() = 0;
    /// Get the next work item to process.
    virtual void *_void_dequeue() = 0;   //取出待处理的items
    /** @brief Process the work item.
     * This function will be called several times in parallel
     * and must therefore be thread-safe. */
    virtual void _void_process(void *item, TPHandle &handle) = 0;
    /** @brief Synchronously finish processing a work item.
     * This function is called after _void_process with the global thread pool lock held,
     * so at most one copy will execute simultaneously for a given thread pool.
     * It can be used for non-thread-safe finalization. */
    virtual void _void_process_finish(void *) = 0;
  };

四类work_queue

  • BatchWorkQueue
    • 每次可以取出多个待处理item
    • 该WorkQueued的item存放容器需要自行定义
    • 需要自行实现如下接口(主要函数):
      • virtual void _dequeue(std::list<T*> *) = 0: 如何从队列中work_queue中拿出items
      • virtual bool _enqueue(T *) = 0: 入队列接口
      • virtual void _process(const std::list<T*> &items, TPHandle &handle) = 0: 批处理接口
  • WorkQueueVal
    • 适用于处理原始值类型或者小对象
    • 将T类型item的值存储队列
    • 存储T类型值的容器需要自行实现
    • 处理缓存容器已经实现,用于存在中间值:
      • std::list<U> to_process; //待处理list, 从放入_void_dequeue()拿出的元素U,每次存入一个
      • std::list<U> to_finish; //_void_process_finish会处理该list中的元素,每次处理一个
    • 需要自行实现如下接口:
      • bool _empty() override = 0: 判断容器非空
      • virtual void _enqueue(T) = 0;: 入队列接口
      • virtual void _process(U u, TPHandle &) = 0;: 处理元素U的函数
  • WorkQueue
    • 适用于处理大对象或者动态申请内存的对象
    • 存储容器需要自行实现
    • 需要自行实现如下接口:
      • virtual bool _enqueue(T *) = 0;: 入workqueue接口
      • virtual T *_dequeue() = 0;: 取work_queue item接口
      • virtual void _process(T *t, TPHandle &) = 0; : item处理接口
  • PointerWQ
    • 适用于处理大对象或者动态申请内存的对象,比WorkQueue更加方便,但是没有WorkQueue抽象
    • 存储容器已经实现:std::list<T *> m_items
    • 只需要实现virtual void process(T *item) = 0;, 用于item处理

可直接使用的两种实现

  • **class GenContextWQ *: public ThreadPool::WorkQueueVal<GenContext<ThreadPool::TPHandle&>>

  • class ContextWQ : public ThreadPool::PointerWQ<Context>

应用举例

创建队列结构

class New_WQ : public ThreadPool::PointerWQ<Context>
{
    public:
        New_WQ(const std::string &name, time_t ti, ThreadPool *tp)
            : ThreadPool::PointerWQ<Context>(name, ti, 0, tp) {
                this->register_work_queue();
            }
        void process(Context *fin) override;
};

void New_WQ::process(Context *fin)
{
    fin->complete(0);
}

启动线程池

1. 创建ThreadPool
thread_pool = new ThreadPool(cct, "thread_pool", "daemon_tp", g_conf()->rep_thread_pool_nr);
thread_pool->start();

2. 创建队列
work_queue = new New_WQ("daemon", 60, thread_pool);

投递任务

Context *ctx = new Test_TheadPool();
work_queue->queue(ctx);

销毁线程池

work_queue->drain();
delete work_queue;
work_queue = nullptr;

thread_pool->stop();
delete thread_pool;
thread_pool = nullptr;
©著作权归作者所有,转载或内容合作请联系作者
禁止转载,如需转载请通过简信或评论联系作者。
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,723评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,003评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,512评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,825评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,874评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,841评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,812评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,582评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,033评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,309评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,450评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,158评论 5 341
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,789评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,409评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,609评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,440评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,357评论 2 352

推荐阅读更多精彩内容

  • Java基础常见英语词汇(共70个)['ɔbdʒekt] ['ɔ:rientid]导向的 ...
    今夜子辰阅读 3,281评论 1 34
  • C++语言的八股文 C++面向对象的特性 封装——隐藏对象的属性和实现细节,仅对外公开接口和对象进行交互,将数据和...
    wolfaherd阅读 821评论 1 3
  • 个人笔记,方便自己查阅使用 Contents Java LangAssignment, ReferenceData...
    freenik阅读 1,382评论 0 6
  • C#集合相关知识 C#里面Collection下面ArrayList 1、动态数组 Array List:动态数组...
    学习中的小白阅读 186评论 0 0
  • 为什么要进行内存优化:APP运行内存限制,OOM导致APP崩溃。APP性能:流畅性、响应速度和用户体验,因为GC回...
    ArcherZang阅读 887评论 0 0