线程池设计优化描述
在程序启动时,预先创建一个线程池并初始化多个线程,以确保程序在运行时只需从线程池中直接获取线程执行任务,从而显著提升运行效率。具体设计包括以下几点:
1. 线程池的线程初始化
在线程池中预先创建固定数量的线程,并保持这些线程处于待命状态。
2. 任务队列
线程池内维护一个任务队列,用于存储待执行的任务。任务可以是带参数的函数。
3. 任务提交接口
提供一个接口用于将任务及其参数推送到任务队列中,以便线程池中的线程能够按需执行这些任务。
4. 任务监听与执行
线程池中的线程持续监听任务队列,通过信号量机制判断队列中是否有新任务。如果检测到任务,线程立即从队列中取出并执行。
5. 高效资源利用
每个线程在任务完成后不会销毁,而是回到待命状态,等待下一个任务,从而避免频繁创建和销毁线程的开销。
这样设计能够显著提高程序对多任务的处理效率,同时保持资源利用的高效性和任务调度的灵活性。
线程池的实现
对代码做了详细注释
#include <iostream>
#include <mutex>
#include <condition_variable>
#include <thread>
#include <queue>
#include <vector>
#include <future>
class ThreadPool
{
public:
ThreadPool(int num) : running_(true)
{
for (int i = 0; i < num; i++)
{
threads_.emplace_back([this](){
while (true)
{
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(mtx_);
// 线程阻塞直到有任务(需要执行的函数)到任务队列中
cv_.wait(lock, [this]{ return !tasks_.empty() || !running_; });
// 如果队列空了并且线程池需要被停止了,直接返回,结束循环
if (tasks_.empty() && !running_)
{
return;
}
// 从任务队列中取出任务
task = std::move(tasks_.front());
tasks_.pop();
}
// 这样通过 std::bind 创建的 task 对象实际上是一个可调用对象。
// 当你调用 task() 时,它会使用绑定的 f 和 args... 来执行原始的函数或 Lambda 表达式。
task();
}
});
}
}
// 析构函数
// 1. 设置 running_ 为false
// 2. 通知所有线程
// 3. 等待线程执行完成
~ThreadPool()
{
// 将running_ 设置为std::atomic<bool>,可以不用手动加锁了
running_ = false;
cv_.notify_all();
for(auto& thread : threads_)
{
if (thread.joinable()) thread.join();
}
}
// 提交任务到任务队列中
// 1. 使用变长参数模板接收函数的所有参数
// 2. 使用std::forward 完美转发保证参数的左值右值状态, 避免无意间的拷贝或移动,从而提高了效率。
// 3. 通知线程有数据到队列中了
// F&& f, Args&&... args 这个是万能引用
template<typename Func, typename ...Args>
void submitTask(Func&& func, Args&& ... args)
{
// std::function<void()> 是一个通用的可调用对象包装器
// 它可以存储任何符合 void() 签名的可调用对象
// 这里是将 task 定义为一个没有返回值(void)且无参数的函数。
// std::bind 是一个用于创建“绑定”函数对象的标准库功能
// 它能够将一个函数(或者可调用对象)与固定的参数绑定,返回一个新的可调用对象。
std::function<void()> task =
std::bind(std::forward<Func>(func), std::forward<Args>(args)...);
{
std::unique_lock<std::mutex> lock(mtx_);
tasks_.emplace(task);
}
cv_.notify_one();
}
// 带有返回结果的submit函数
template<typename Func, typename ...Args>
auto submitTaskWithResult(Func&& func, Args&& ... args) -> std::future<decltype(func(args...))>
{
// 推导函数返回的类型,rtype代替函数返回类型
using rtype = decltype(func(args...));
// 使用std::packaged_task 接收 std::bind绑定的可调用对象
// 这里使用shard_ptr 是为了将task放入到队列中时, 不会被销毁
// 有可能放入到队列后还没有执行, 这里已经执行完了,导致task被销毁
std::shared_ptr<std::packaged_task<rtype()>> task_ptr = std::make_shared<std::packaged_task<rtype()>>(
std::bind(std::forward<Func>(func), std::forward<Args>(args)...));
auto fut = task_ptr->get_future();
{
std::unique_lock<std::mutex> lock(mtx_);
// 这个任务队列里面接收到一个lambda函数,这个函数做了执行task函数
// 保持和没有返回值的线程池保持一致, 使用lambda函数在做一层封装
// 不用在线程中解引用再执行
tasks_.emplace([task_ptr]{ (*task_ptr)(); });
}
cv_.notify_one();
return fut;
}
private:
// 线程列表
std::vector<std::thread> threads_;
// 任务队列
std::queue<std::function<void()>> tasks_;
std::condition_variable cv_;
std::mutex mtx_;
std::atomic<bool> running_;
};
void print_task(int index)
{
std::cout << "index " << index << std::endl;
}
int square_task(int x)
{
return x * x;
}
int main()
{
ThreadPool pool(4);
auto res = pool.submitTaskWithResult(square_task, 20);
for (int i = 0; i < 10; i++)
{
pool.submitTask(print_task, i);
}
std::this_thread::sleep_for(std::chrono::microseconds(10));
std::cout << "20 * 20 = " << res.get() << std::endl;
return 0;
}