ceph代码src/common/WorkQueue.h提供了一个功能强大的线程池。简单分析其代码,介绍其使用方法。
主要成员
/// Pool of threads that share work submitted to multiple work queues.
class ThreadPool : public md_config_obs_t {
CephContext *cct;
string name; //线程池名字
string thread_name;
string lockname; //锁的名字
Mutex _lock; //线程和工作队列的互斥锁
Cond _cond; //锁对应的条件变量
bool _stop; //控制线程停止
int _pause; //控制线程暂停
int _draining;
Cond _wait_cond;
int ioprio_class, ioprio_priority;
// track thread pool size changes
unsigned _num_threads; //线程数
string _thread_num_option; //读取配置中的线程数的key,用于动态增减线程池中的线程数
const char **_conf_keys;
vector<WorkQueue_ *> work_queues; //工作队列数组
int next_work_queue = 0; //用于循环从工作队列中取出任务
set<WorkThread *> _threads; //工作线程集合
list<WorkThread *> _old_threads; ///等待被joined的线程
int processing;
};
与一般线程池实现不同的是,ceph的线程池实现了多种不同的工作队列。一般情况下,一个线程池对应一个类型的工作队列。在要求不高的情况下,也可以一个线程池对应多种类型的工作队列,让线程池处理不同类型的任务。
对外接口
class ThreadPool : public md_config_obs_t {
//......
public:
ThreadPool(CephContext *cct_, string nm, string tn, int n,
const char *option = NULL);
~ThreadPool() override;
/// return number of threads currently running
int get_num_threads() {
Mutex::Locker l(_lock);
return _num_threads;
}
/// assign a work queue to this thread pool
void add_work_queue(WorkQueue_ *wq) {
Mutex::Locker l(_lock);
work_queues.push_back(wq);
}
/// remove a work queue from this thread pool
void remove_work_queue(WorkQueue_ *wq) {
Mutex::Locker l(_lock);
unsigned i = 0;
while (work_queues[i] != wq) i++;
for (i++; i < work_queues.size(); i++) work_queues[i - 1] = work_queues[i];
assert(i == work_queues.size());
work_queues.resize(i - 1);
}
/// start thread pool thread
void start();
/// stop thread pool thread
void stop(bool clear_after = true);
/// pause thread pool (if it not already paused)
void pause();
/// pause initiation of new work
void pause_new();
/// resume work in thread pool. must match each pause() call 1:1 to resume.
void unpause();
/** @brief Wait until work completes.
* If the parameter is NULL, blocks until all threads are idle.
* If it is not NULL, blocks until the given work queue does not have
* any items left to process. */
void drain(WorkQueue_ *wq = 0);
/// set io priority
void set_ioprio(int cls, int priority);
// 对线程池内部锁和条件变量的简单封装
// 调用线程池接口时不需要使用,线程池对外函数内部已经有加锁逻辑
/// take thread pool lock
void lock() { _lock.Lock(); }
/// release thread pool lock
void unlock() { _lock.Unlock(); }
/// wait for a kick on this thread pool
void wait(Cond &c) { c.Wait(_lock); }
/// wake up a waiter (with lock already held)
void _wake() { _cond.Signal(); }
/// wake up a waiter (without lock held)
void wake() {
Mutex::Locker l(_lock);
_cond.Signal();
}
void _wait() { _cond.Wait(_lock); }
};
工作队列
在上面的数据成员中,vector<WorkQueue_ *> work_queues;
项就是工作队列的数组,其中WorkQueue_
类是所有不同类型工作队列的基类。所有工作队列都继承自WorkQueue_
,并实现对应的接口。
目前主要有4种队列:
队列还有其他的函数,也可以重写,但不是必须重写。详细可以去看代码。
指针类型队列
/** @brief Template by-pointer work queue.
* Skeleton implementation of a queue that processes items of a given type
* submitted as pointers. This is useful when the work item are large or
* include dynamically allocated memory. The queue will automatically add
* itself to the thread pool on construction and remove itself on destruction.
*/
template <class T>
class WorkQueue : public WorkQueue_ {
// 需要自己增加queue成员
// 必须重写的函数
/// Remove all work items from the queue.
virtual void _clear() = 0;
/// Check whether there is anything to do.
virtual bool _empty() = 0;
/// Add a work item to the queue.
virtual bool _enqueue(T *) = 0;
/// Dequeue a previously submitted work item.
virtual void _dequeue(T *) = 0;
/// Dequeue a work item and return the original submitted pointer.
virtual T *_dequeue() = 0;
/// Process a work item. Called from the worker threads.
virtual void _process(T *t, TPHandle &) = 0;
};
简单的指针队列
//同为指针传递,相比于WorkQueue实现的更为完善
template <typename T>
class PointerWQ : public WorkQueue_ {
// 需要重写的函数
// T*为enqueue的类型,可以是函数指针,也可以是仿函数或者普通类对象等等
// process函数的作用就是让你自己执行T所代表的任务
virtual void process(T *item) = 0;
/// Remove all work items from the queue.
virtual void _clear() = 0;
};
批任务队列
/** @brief Work queue that processes several submitted items at once.
* The queue will automatically add itself to the thread pool on construction
* and remove itself on destruction. */
template <class T>
class BatchWorkQueue : public WorkQueue_ {
//必须重写的函数
virtual void _process(const list<T *> &items, TPHandle &handle) = 0;
/// Remove all work items from the queue.
virtual void _clear() = 0;
/// Check whether there is anything to do.
virtual bool _empty() = 0;
virtual bool _enqueue(T *) = 0;
virtual void _dequeue(T *) = 0;
virtual void _dequeue(list<T *> *) = 0;
};
值类型队列
/** @brief Templated by-value work queue.
* Skeleton implementation of a queue that processes items submitted by value.
* This is useful if the items are single primitive values or very small
* objects (a few bytes). The queue will automatically add itself to the
* thread pool on construction and remove itself on destruction. */
template <typename T, typename U = T>
class WorkQueueVal : public WorkQueue_ {
// 需要重写的函数
/// Remove all work items from the queue.
virtual void _clear() = 0;
/// Check whether there is anything to do.
bool _empty() override = 0;
virtual void _enqueue(T) = 0;
virtual void _enqueue_front(T) = 0;
virtual U _dequeue() = 0;
virtual void _process(U u, TPHandle &) = 0;
};
使用
要使用ThreadPool。
- 需要选择一个或多个WorkQueue继承,实现对应的方法,要注意文末提到的超时检查。
- 创建线程池对象,并调用
ThreadPool::start()
方法,该方法会在加锁的情况下调用ThreadPool::start_threads()
函数启动工作线程。 - 创建工作队列对象,并调用
ThreadPool::add_work_queue(WorkQueue_ *wq)
将工作队列加入线程池。 - 向工作队列中添加任务,任务会自动被线程池调度执行。也可以使用工作队列的其他函数控制状态。
工作线程的执行逻辑
执行逻辑在void ThreadPool::worker(WorkThread *wt)
函数,概括如下:
- 判断
_stop
是否为true,true则退出循环,做收尾工作 - 调用
join_old_threads
函数join掉old队列中的线程,并将其从队列中删除。直到old队列为空 - 判断当前线程队列中的线程数是否大于
_num_threads
设定的数目,如果是,则将当前线程从工作线程队列删除,放入old队列。 - 如果
_pause
为false并且工作队列不为空,从next_work_queue
指向的工作队列中取出一个任务,执行任务。执行过程会接连调用队列的_void_process
和_void_process_finish
函数。而_void_process
则最终会调用我们重写的_process
函数。默认的_void_process_finish
函数一般不做实质性工作,需要的话,我们可以重写它。
超时检查
我们看到在_void_process(void *item, TPHandle &handle)
函数中还有第二个参数,TPHandle。
TPHandle的结构如下:
class TPHandle {
friend class ThreadPool;
CephContext *cct;
heartbeat_handle_d *hb;
time_t grace; //超时时间,超时后状态为unhealthy
time_t suicide_grace;//自杀时间,超时后自杀
public:
TPHandle(CephContext *cct, heartbeat_handle_d *hb, time_t grace,
time_t suicide_grace)
: cct(cct), hb(hb), grace(grace), suicide_grace(suicide_grace) {}
void reset_tp_timeout();
void suspend_tp_timeout();
};
在每次worker执行任务的时候,都会创建一个tphandle,并根据配置设置其超时时间和自杀时间。若当前任务的的执行时间超过grace,会导致cct->get_heartbeat_map()->is_healthy()返回false,当超过suicide_grace时,会导致线程被kill。
void ThreadPool::worker(WorkThread *wt)
函数片段:
// 线程启动后,将当前线程加入heartbeatmap
heartbeat_handle_d *hb =
cct->get_heartbeat_map()->add_worker(ss.str(), pthread_self());
while(!_stop){
......
// 创建一个handle对象,设置超时时间
TPHandle tp_handle(cct, hb, wq->timeout_interval,
wq->suicide_interval);
tp_handle.reset_tp_timeout();
_lock.Unlock();
// 执行任务i
wq->_void_process(item, tp_handle);
_lock.Lock();
// 收尾工作,如果有的话
wq->_void_process_finish(item);
}
//线程结束前, 从map中移除
cct->get_heartbeat_map()->remove_worker(hb);
HeartbeatMap中检测超时的逻辑如下。
可以看到,grace超时仅仅设置healthy = false;
。而suicide_grace超时则使用pthread_kill(h->thread_id, SIGABRT);
kill掉当前工作线程。
bool HeartbeatMap::_check(const heartbeat_handle_d *h, const char *who, time_t now)
{
bool healthy = true;
time_t was;
was = h->timeout;
if (was && was < now) {
ldout(m_cct, 1) << who << " '" << h->name << "'"
<< " had timed out after " << h->grace << dendl;
healthy = false;
}
was = h->suicide_timeout;
if (was && was < now) {
ldout(m_cct, 1) << who << " '" << h->name << "'"
<< " had suicide timed out after " << h->suicide_grace << dendl;
pthread_kill(h->thread_id, SIGABRT);
sleep(1);
assert(0 == "hit suicide timeout");
}
return healthy;
}
如果你不想使用超时检查特性,可以在重写的_process
函数中调用suspend_tp_timeout
函数关闭当前handle的定时。