线程池,这个线程池暂时不具有自动扩容缩小的动能。
逻辑
正常的线程池的线是这个样子的。
- 创建一个线程池对象,
- 执行
start
成员函数,开始创建线程池。
每创建一个线程就要开始执行函数:如果任务队列中有任务,那么就取出这个任务开始执行。
如果任务队列中没有任务,那么应该阻塞(条件变量)。 - 添加任务(需要互斥量),添加任务以后,需要通知线程。
- 销毁的时候,需要join。
类成员
线程池这个类应该包含:
- 首先,应该定义函数类型,作为任务。
boost::function<void ()> Task
- 一个私有成员,队列来存储任务,先进先出。同时有一个互斥量,保护该容器。有一个私有成员定义最大任务。
std::deque<Task> _queue;
MuteLock _queueLock;
size_t _maxQueueSize;
- 一个容器,用来存储线程,使用了之前我们封装的
Thread
类。使用的容器是boost::ptr_vector
。
boost::ptr_vector<Thread>
- 应该有条件变量,可以使线程阻塞。使用之前封装的
Condition
类。
于任务队列为空时执行的操作和满了的时候执行的操作。
Condition _doTask;
Condition _addTask;
- 需要一个函数用来添加任务到任务队列
void add(Task &task);
- 一个函数用来从队列中取任务,应该是一个私有成员。
Task takeTask();
- 传给线程的函数,这个函数应该调用取任务的函数
void *runInThread(void *);
- 开始创建线程池的函数,和一个指示线程是否开始执行的变量。
void start()
bool _running;
重要的就是上面这几个。
代码
类的成员:
class ThreadPool
{
private:
MuteLock _queueLock;
std::deque<Task> _queue;
size_t maxQueue;
Task takeTask();
bool _running; //用来判断是否开始执行
}
- 首先取任务
Task takeTask()
Task ThreadPool::takeTask()
{
MutexLockGuard lock(_queueLock); //主要是锁住_queue
//空
while (_queue.empty() && _running) //当线程池是空的,并且线程池已经开始执行了
{_doTask.wait();} //如果是空的,那么就线程阻塞。
//非空
Task task;
if(!_queue.empty() && _running)
{
task=_queue.front()
_queue.pop_front();
//取出一个后,就可以再添加任务了
if(queue_.size() > 0 )
{
_addTask.notifyAll();//
}
return task;
}
}
- 传入线程的函数
如果任务队列中没有任务,那么会在takeTask()
中阻塞。
void *ThreadPool::runInThread()
{
//这里可能有异常的。
while (_running)
{
Task task(takeTask());//取任务。可能在此阻塞
if (task) //如果取到了
{task();}
}
}
- 添加任务
void ThreadPool::add(Task &task)
{
if (_threads.empty()) //如果线程池是空的,那么没有可以直接运行,或者调用`start()`
{task();}
else
{
MutexLockGuard lock(_mutex);
//如果任务队列满了,那么不应该再添加了,调用应该在这里阻塞。不再添加。
while (isFull())
{_addTask.wait();}
assert(!isFull());
_queue.push_back(std::move(task));
_doTask.notify();
}
}
同时,isFull()
bool ThreadPool::isFull() const
{return maxQueueSize_ > 0 && queue_.size() >= _maxQueueSize;}
- 创建线程池的函数
void ThreadPool::start(int numThreads)
{
assert(threads_.empty());
running_ = true;
threads_.reserve(numThreads);
for (int i = 0; i < numThreads; ++i)
{
_threads.push_back(new muduo::Thread(
boost::bind(&ThreadPool::runInThread, this)));//关于this看下面
_threads[i].start();
}
}
这里的bind
第一个参数需要是指针类型的。
- 停止运行的函数,需要回收线程资源,运行
join()
这里有一种bind的用法
void ThreadPool::stop()
{
{
MutexLockGuard lock(mutex_);
running_ = false;
notEmpty_.notifyAll();
}
for_each(threads_.begin(),
_threads.end(),
boost::bind(&Thread::join, _1)); //运行成员
}
boost::bind()
本来参数因该是传入的函数参数+1。
类的成员函数不同于普通的函数,因为成员函数指针不能直接调用operator(),它必须被绑定到一个对象或指针,然后才能得到this指针进而调用成员函数。因此bind需要 “牺牲”一个占位符,要求提供一个类的实例、引用或者指针,通过对象作为第一个参数来调用成员函数,
参考:http://www.cnblogs.com/blueoverflow/p/4740093.html
ptr_vector<>
可以看作是一个ptr_vector<unique_ptr>