muduo源码 ---ThreadPool介绍

muduo源码分析系列 线程池的实现

分析线程池之前,先介绍线程
毕竟线程池里保存着每个线程
先分析Thread类

class Thread : noncopyable
{
 public:
  typedef std::function<void ()> ThreadFunc;

  explicit Thread(ThreadFunc, const string& name = string());
  // FIXME: make it movable in C++11
  ~Thread();

  void start();
  int join(); // return pthread_join()

  bool started() const { return started_; }
  // pthread_t pthreadId() const { return pthreadId_; }
  pid_t tid() const { return tid_; }
  const string& name() const { return name_; }

  static int numCreated() { return numCreated_.get(); }

 private:
  void setDefaultName();

  bool       started_;
  bool       joined_;
  pthread_t  pthreadId_;
  pid_t      tid_;
  ThreadFunc func_;
  string     name_;
  CountDownLatch latch_;

  static AtomicInt32 numCreated_;
};

仔细观察其实就是把C11的thread相关的方法进行了进一步的封装
但是有个地方 CountDownLatch是什么呢
举个例子:在考试的时候 收卷老师必须要等到所有考生的卷子都收拾好了,才能离开教室。这就是latch的含义,意思是某个线程 必须要等待其他线程完成 才能执行。直接看CountDownLatch的源码

class CountDownLatch :    
{
 public:

  explicit CountDownLatch(int count);

  void wait();

  void countDown();

  int getCount() const;

 private:
  mutable MutexLock mutex_;
  Condition condition_ GUARDED_BY(mutex_);
  int count_ GUARDED_BY(mutex_);
};

看成员: 一个mutex_,一个条件变量condtion,一个公共count_

看方法

CountDownLatch::CountDownLatch(int count)
  : mutex_(),
    condition_(mutex_),
    count_(count)
{
}

void CountDownLatch::wait()
{
  MutexLockGuard lock(mutex_); //上锁
  while (count_ > 0) //等待count变成0
  {
    condition_.wait();
  }
}

void CountDownLatch::countDown()
{
  MutexLockGuard lock(mutex_); //上锁
  --count_; //减少count值
  if (count_ == 0)
  {
    condition_.notifyAll(); //count值为0了 唤醒等待的condtion_变量
  }
}

int CountDownLatch::getCount() const
{
  MutexLockGuard lock(mutex_);
  return count_;
}

看了实现其实很简单:本质就是维护一个共享变量count_,这个count_理解成上面那个例子的学生,每次学生离开教室 那么就调用一次countDown方法,该方法将count-1,如果最后一个学生离开了那么count为0,则调用condtion_的notify方法,唤醒在wait里阻塞的的线程。

知道了latch的功能,就可以开始看Thread相关的方法了。

构造函数

Thread::Thread(ThreadFunc func, const string& n)
  : started_(false),
    joined_(false),
    pthreadId_(0),
    tid_(0),
    func_(std::move(func)),
    name_(n),
    latch_(1) //latch为1 说明只需要等待一个线程countDown即可
{
  setDefaultName();
}

来看最关键的start方法

void Thread::start()
{
  assert(!started_);
  started_ = true;
  // FIXME: move(func_)
  detail::ThreadData* data = new detail::ThreadData(func_, name_, &tid_, &latch_);
  if (pthread_create(&pthreadId_, NULL, &detail::startThread, data))
  {
    started_ = false;
    delete data; // or no delete?
    LOG_SYSFATAL << "Failed in pthread_create";
  }
  else
  {
    latch_.wait();
    assert(tid_ > 0);
  }
}

这里构造了一个 ThreadData类,然后调用系统api,创建出一个新的线程,且这个线程执行的函数是ThreadData里的detail::startThread()方法(执行用户的func)。
如果创建失败,就delete掉,成功就等待latch里的计数器变为0(如果latch是大于0的话),否则就一直阻塞。

来看看这个类的声明

ThreadData(ThreadFunc func,
             const string& name,
             pid_t* tid,
             CountDownLatch* latch)
    : func_(std::move(func)),
      name_(name),
      tid_(tid),
      latch_(latch)
  { }

  void runInThread()
  {
    *tid_ = muduo::CurrentThread::tid();
    tid_ = NULL;
    latch_->countDown(); //将latch_计数器-1 并且如果等于0的时候 唤醒 latch_.wait线程
    latch_ = NULL;

    muduo::CurrentThread::t_threadName = name_.empty() ? "muduoThread" : name_.c_str();
    ::prctl(PR_SET_NAME, muduo::CurrentThread::t_threadName);
    try
    {
      func_(); //执行func
      muduo::CurrentThread::t_threadName = "finished";
    }
    catch (const Exception& ex)
    {
      muduo::CurrentThread::t_threadName = "crashed";
      fprintf(stderr, "exception caught in Thread %s\n", name_.c_str());
      fprintf(stderr, "reason: %s\n", ex.what());
      fprintf(stderr, "stack trace: %s\n", ex.stackTrace());
      abort();
    }
    catch (const std::exception& ex)
    {
      muduo::CurrentThread::t_threadName = "crashed";
      fprintf(stderr, "exception caught in Thread %s\n", name_.c_str());
      fprintf(stderr, "reason: %s\n", ex.what());
      abort();
    }
    catch (...)
    {
      muduo::CurrentThread::t_threadName = "crashed";
      fprintf(stderr, "unknown exception caught in Thread %s\n", name_.c_str());
      throw; // rethrow
    }
  }

上面一堆函数,本质上最重要的还是最终执行了用户传递的func

void* startThread(void* obj)
{
  ThreadData* data = static_cast<ThreadData*>(obj);
  data->runInThread();
  delete data;
  return NULL;
}

threadDetail类里的一个方法,通过万能指针void*进行强制转化成目标ThreadData类,然后执行runInThread,执行完delete掉这个data。

看完这些设计其实可以明白,为什么要这么设计呢?个人认为还是考虑到线程子资源复用的问题。把线程里执行的函数封装成data类,这样每次执行完毕只需要将data删除掉,而不需要去重复分配和删除掉线程。


image.png

并且 也能说明了。为什么runInThread这里tid需要为空了,因为执行完这个data之后就不需要这个data对象的数据了,latch_置为null也是一样的。

感叹陈硕大佬的代码功底

讲完Thread可以开始讲ThreadPool了

先上代码

class ThreadPool : noncopyable
{
 public:
  typedef std::function<void ()> Task;

  explicit ThreadPool(const string& nameArg = string("ThreadPool"));
  ~ThreadPool();

  // Must be called before start().
  void setMaxQueueSize(int maxSize) { maxQueueSize_ = maxSize; }
  void setThreadInitCallback(const Task& cb)
  { threadInitCallback_ = cb; }

  void start(int numThreads);
  void stop();

  const string& name() const
  { return name_; }

  size_t queueSize() const;

  void run(Task f);

 private:
  bool isFull() const REQUIRES(mutex_);
  void runInThread();
  Task take();

  mutable MutexLock mutex_;
  Condition notEmpty_ GUARDED_BY(mutex_);
  Condition notFull_ GUARDED_BY(mutex_);
  string name_;
  Task threadInitCallback_;
  std::vector<std::unique_ptr<muduo::Thread>> threads_;
  std::deque<Task> queue_ GUARDED_BY(mutex_);
  size_t maxQueueSize_;
  bool running_;
};

以上是类的声明

接下来是对每个成员变量的解释:
mutable MutexLock mutex_
这是作者把mutex互斥锁进行了一个封装

class CAPABILITY("mutex") MutexLock : noncopyable
{
 public:
  MutexLock()
    : holder_(0)
  {
    MCHECK(pthread_mutex_init(&mutex_, NULL)); //初始化调用系统函数init
  }

  ~MutexLock()
  {
    assert(holder_ == 0);
    MCHECK(pthread_mutex_destroy(&mutex_)); //销毁调用系统函数 destroy
  }

  // must be called when locked, i.e. for assertion
  bool isLockedByThisThread() const
  {
    return holder_ == CurrentThread::tid(); //判断这个锁是否是当前线程锁住的
  }

  void assertLocked() const ASSERT_CAPABILITY(this)
  {
    assert(isLockedByThisThread());
  }

  // internal usage

  void lock() ACQUIRE()
  {
    MCHECK(pthread_mutex_lock(&mutex_)); //加锁
    assignHolder();
  }

  void unlock() RELEASE()
  {
    unassignHolder();
    MCHECK(pthread_mutex_unlock(&mutex_)); //解锁
  }

  pthread_mutex_t* getPthreadMutex() /* non-const */
  {
    return &mutex_;  /
  }

 private:
  friend class Condition;

  class UnassignGuard : noncopyable
  {
   public:
    explicit UnassignGuard(MutexLock& owner)
      : owner_(owner)
    {
      owner_.unassignHolder();
    }

    ~UnassignGuard()
    {
      owner_.assignHolder();
    }

   private:
    MutexLock& owner_;
  };

  void unassignHolder()
  {
    holder_ = 0;
  }

  void assignHolder()
  {
    holder_ = CurrentThread::tid();
  }

  pthread_mutex_t mutex_;
  pid_t holder_;
};

直接看这个Mutex类的成员对象:
pthread_mutex_t mutex_;
pid_t holder_;

也就是说 这个Mutex类对pthread_mutex_t 和pid进行了封装,每个mutex对象都有一个锁和 掌握该锁的线程pid。

再看接下来的ThreadPool成员
Condition notEmpty_ GUARDED_BY(mutex_);
Condition notFull_ GUARDED_BY(mutex_);
两个条件变量
string name_;
名称
Task threadInitCallback_;
Task是 typedef std::function<void ()> Task;
本质是function封装的函数

线程池初始化执行的函数
std::vector<std::unique_ptr<muduo::Thread>> threads_;
线程vector
std::deque<Task> queue_ GUARDED_BY(mutex_);
任务队列
size_t maxQueueSize_;
队列最大任务数量
bool running_;
是否在执行

开始分析这个线程池的各个函数

ThreadPool::ThreadPool(const string& nameArg)
  : mutex_(),
    notEmpty_(mutex_),
    notFull_(mutex_),
    name_(nameArg),
    maxQueueSize_(0),
    running_(false)
{
}

成员初始化

void ThreadPool::start(int numThreads)
{
  assert(threads_.empty());
  running_ = true; //设置为运行状态
  threads_.reserve(numThreads); //为线程数量分配vector大小
  for (int i = 0; i < numThreads; ++i) 
  {
    char id[32];
    snprintf(id, sizeof id, "%d", i+1);
    threads_.emplace_back(new muduo::Thread(
          std::bind(&ThreadPool::runInThread, this), name_+id));  //new一个thread对象 加入到 vector中
    threads_[i]->start(); //执行thread
  }
  if (numThreads == 0 && threadInitCallback_)
  {
    threadInitCallback_();  //仅当传参是0且初始化函数是非空 执行初始化函数
  }
}

start 方法 具体thread类后续分析

void ThreadPool::stop()
{
  {
  MutexLockGuard lock(mutex_); // 当前线程池加锁 防止别的线程使用
  running_ = false;  //设置为结束
  notEmpty_.notifyAll(); //唤醒条件
  notFull_.notifyAll();//唤醒条件
  }
  for (auto& thr : threads_)
  {
    thr->join();  //将所有thread结束掉
  }
}

stop方法
上面的
notEmpty_.notifyAll(); //唤醒所有等待 当前条件变量的线程
notFull_.notifyAll();//唤醒所有等待当前条件变量的线程

实际上就是 调用系统api

  void notifyAll()
  {
    MCHECK(pthread_cond_broadcast(&pcond_));  //pcond就是condtion中的成员你变量
  }

通俗的说就是:在线程池中,多个线程可能会同时等待同一个条件变量 ,此时在等待的时候 会有多个线程被挂起,所以调用notifyAll把所有阻塞的线程唤醒,这样才能进行后续的join操作。
但是,在当前线程池线程中,实际上这两个条件变量更多的表示一个当前的状态。
notEmpty_在wait的情况 :说明当前的线程池并不处于非空的情况 ==》 当前线程池是空的(queue是空的)
notEmpty在notify的情况:当前线程池里的queue是非空,说明有task任务需要执行

同理notFull也是一样

如果还不理解可以仔细google一下条件变量的用法

void ThreadPool::run(Task task)
{
  if (threads_.empty())
  {
    task(); //如果当前线程池子没有线程,直接使用线程池所在的线程执行任务
  }
  else
  {
    MutexLockGuard lock(mutex_);
    while (isFull() && running_) //如果当前的线程池子里所有线程都被占用了
    {
      notFull_.wait();  //说明当前的线程池处于满任务状态 阻塞
    }
    if (!running_) return;
    assert(!isFull());

    queue_.push_back(std::move(task)); //任务队列入队
    notEmpty_.notify(); //唤醒所有使用notEmpty条件变量的线程
  }
}

run方法
实际上就是在运行start方法之后,暴露给用户使用的接口。
start方法:设置当前线程池的线程数量。run方法,用户通过封装task传递给run方法,run方法里将task存入queue中,等待runInThread对task进行Take()

ThreadPool::Task ThreadPool::take()
{
  MutexLockGuard lock(mutex_);
  // always use a while-loop, due to spurious wakeup
  while (queue_.empty() && running_)
  {
    notEmpty_.wait();
  }
  Task task;
  if (!queue_.empty())
  {
    task = queue_.front();
    queue_.pop_front();
    if (maxQueueSize_ > 0)
    {
      notFull_.notify(); 
    }
  }
  return task;
}

take方法,实际上就是从queue队列中取出任务,并且返回任务

void ThreadPool::runInThread()
{
  try
  {
    if (threadInitCallback_)
    {
      threadInitCallback_(); //初始化函数
    }
    while (running_)
    {
      Task task(take());
      if (task)
      {
        task();
      }
    }
  }
  catch (const Exception &ex)
  {
    fprintf(stderr, "exception caught in ThreadPool %s\n", name_.c_str());
    fprintf(stderr, "reason: %s\n", ex.what());
    fprintf(stderr, "stack trace: %s\n", ex.stackTrace());
    abort();
  }
  catch (const std::exception &ex)
  {
    fprintf(stderr, "exception caught in ThreadPool %s\n", name_.c_str());
    fprintf(stderr, "reason: %s\n", ex.what());
    abort();
  }
  catch (...)
  {
    fprintf(stderr, "unknown exception caught in ThreadPool %s\n", name_.c_str());
    throw; // rethrow
  }
}

runInThread方法,执行take方法,将task真正执行

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,542评论 6 504
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,822评论 3 394
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,912评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,449评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,500评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,370评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,193评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,074评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,505评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,722评论 3 335
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,841评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,569评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,168评论 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,783评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,918评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,962评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,781评论 2 354

推荐阅读更多精彩内容