C++线程池
进程的创建和销毁,代价是昂贵的,除去操作系统的实现及其本身的原理,跟线程相比它更显得重量级。
这几年互联网的迅速发展,让线程正面临着跟进程一样的“重量级”困扰。尤其是GO等语言推出协程(纤程)
后,线程更是不堪其重。那么有没有改进的方向呢?有,将线程池化——线程池。
由于C++版本推进的历程(C++98
, C++03
, C++11
, C++14
, C++17
, C++20
)以及其弱鸡般的ABI兼容性,导致很多框架用起来得自己造轮子。C++版本再吐槽一句好了, C++即使版本推进到0xff, 对很多人来说还是c with class, 包括我。
我们的目标是, 造一个很Java中的ThreadPool类似的线程池。目前的进度:
- [x] 极其简易的线程池(header only)
- [x] 支持设置线程池核心线程数
- [ ] 支持设置线程池最大线程数
- [ ] 支持设置最大缓存的任务数
- [ ] 支持设置任务提交拒绝策略
线程池中的概念:
- job: 需要在线程中执行的代码
- 例如: void read(const string & path, const HANDLE handle);
- 该函数从文本中读取内容然后交给窗口渲染到界面上
- task: 将job封装成一个task, 由于job的函数签名各异,所以需要封装(Java的job是Runnable,接口签名一致)。
- 例如: auto task = [=]()->void { return read(path, handle); }
- 这样就将签名各异的job统一封装成了std::function<void()>类型的task
- 通过std::packaged_task和std::future处理job被异步调用的返回值
- queue: 缓存task的队列, 队列操作和线程池中的线程耦合度很高, 原因如下:
- 队列中的任务少时, 池中的空闲线程如何做到真正的不占用cpu?
- 目前此项目是通过std::condition_variable的条件判断让空闲线程阻塞从而让出cpu
- Java中是通过实现BlockQueue实现的,也就是队列中没有任务时,线程从队列中get会阻塞, 从而让出cpu
- 也可以通过信号量 互斥量实现
- 队列read write操作时, 可根据现实情况实现读优先、写优先的锁来平衡队列task的生产和消费, 目前此项目不支持
- 设置queue的最大缓存task数
- 为什么采取队列, 是为了保证task被执行的优先级(队列可以保证先提交的task被先执行,但是不保证先提交的task被先执行完)
- 队列中的任务少时, 池中的空闲线程如何做到真正的不占用cpu?
- thread: 采用C++11 标准库中的std::thread
- 根据std::thread::hardware_concurrency()获取cpu数量进行任务cpu友好性优化, 目前此项目不支持
- 设置thread的cpu亲和性优化程序执行(Windows平台:通过SetThreadAffinityMask指定线程在cpu哪个核心上运行)
Code
#ifndef _THREAD_POOL_H
#define _THREAD_POOL_H
#include <memory>
#include <functional>
#include <future>
#include <condition_variable>
#include <queue>
#include <algorithm>
#include <vector>
#include <thread>
#include <typeinfo>
/**
* 几个需要注意的点:
* 1、tasks的读写锁需要优化成带优先级的锁, 可以肯定线程池的绝大部分使用场景commit task比run task更密集
* 2、根据tasks以及cpu扩展线程数
* 3、支持允许缓存的task数,如果超出此数将采取拒绝策略
* 4、拒绝策略
*/
class ThreadPool{
public:
ThreadPool(int core, int max = 0, int cache = 0): core(core),//由于max和cache暂时没用到,因此赋值0
max(max), cache(cache), quit(false), force(false){
}
~ThreadPool(){
this->quit.store(true);
this->enable.notify_all();
std::for_each(this->pool.begin(), this->pool.end(), [](std::thread & t){
if(t.joinable()){
t.join();
}
});
}
public:
void start(){
for(auto idx = 0; idx < core; ++idx){
pool.push_back(std::thread([this](){
// 第一次退出,判断是否要强制退出
bool quit = this->force.load() ? this->quit.load() : false;
for(; !quit;){
std::unique_lock<std::mutex> lock(this->oper_lock);
this->enable.wait(lock, [this](){
return this->quit.load() || !this->tasks.empty();
});
// 不是强制退出时可从这里退出
if(this->quit.load() && this->tasks.empty()){
return;
}
std::function<void()> task = std::move(this->tasks.front());
this->tasks.pop();
task();
}
}));
}
}
void shutdown(bool force = false){
this->quit.store(true);
this->force.store(force);
}
//void commit(std::function<void (void * param)> task);
template<class T, class... Args>
auto commit(T && t, Args&&...args)->std::future<decltype(t(args...))>{
using TYPE = decltype(t(args...));
if(this->quit.load()){
//dont know return what, so throw an exception
throw std::runtime_error("thread pool is alreay shutdown.");
}
// 1、std::packaged_task<decltype(f(args...))() 类似std::function\
但是会将其封装的可调用元素的结果封装在std::future中
// 2、std::make_shared 创建std::packaged_task<decltype(f(args...))()\
类型的智能指针
// 3、std::bind(std::forward<T>(t), std::forward<Args>(args)...)当做\
std::packaged_task的构造参数
auto task = std::make_shared<std::packaged_task<TYPE()> >(
std::bind(std::forward<T>(t), std::forward<Args>(args)...)
);
std::future<TYPE> result = task->get_future();
std::lock_guard<std::mutex> lock(this->oper_lock);
//将packaged_task 包裹在一个签名为void()的lambda函数中调用,因为此lambda函数符合std::function<void()>\
的签名,所以可以放到queue中
this->tasks.emplace([task](){
(*task)(); //调用packaged_task
});
this->enable.notify_one(); // 在线程池中唤醒一个休眠的线程
return result;
}
private:
//void move();
private:
std::vector<std::thread> pool;
std::queue<std::function<void()> > tasks;
int core; //线程池核心线程数
int max; //线程池根据tasks量以及cpu数最大可扩展的量
int cache; //运行tasks可缓存的最大task数,超出次数后commit将采取拒绝策略
std::atomic<bool> quit; //线程池shutdown条件, true时shutdown
std::atomic<bool> force; //是否强制shutdown,true时有剩余的task将不执行直接退出, false时等待执行完所有的task再退出
std::condition_variable enable; //
std::mutex oper_lock; // queue的读写锁
};
#endif
Test Code
#include <iostream>
#include <algorithm>
#include <random>
#include <chrono>
#include "./pool/ThreadPool.hpp"
int main(int argc, char** argv){
ThreadPool pool(4);
pool.start();
std::default_random_engine rd;
std::uniform_int_distribution<int> rang(100, 1000);
for(int idx = 0; idx < 20; ++idx){
pool.commit([=](int x, int y, int t){
std::cout << "thread id : " << std::this_thread::get_id()
<< " x = " << x << " y = " << y <<
" sleep time = " << t << " ms" <<
" id = " << idx << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(t));
}, rang(rd), rang(rd), rang(rd));
}
std::vector<std::future<int> > results;
for (auto index = 20; index < 50; ++index){
results.push_back(
pool.commit([=]()->int{
return index;
})
);
}
for ( auto & r : results){
std::cout << "get result from thread "
<< " index = " << r.get() << std::endl;
}
char command = std::cin.get();
if (command == 'q'){
pool.shutdown(true);
}else if (command == 'e'){
pool.shutdown(true);
try
{
pool.commit([](){
std::cout << "i want to get an exception" << std::endl;
});
}
catch(const std::exception& e)
{
std::cerr << e.what() << '\n';
}
}
std::cout << "test finish, OY!" << std::endl;
return 0;
}
Compile & Link
g++ -g -O3 -Wall -std=c++11 main.cpp -o ./out/test
Run
thread id : 0x70000a352000 x = 242 y = 937 sleep time = 480 ms id = 0
thread id : 0x70000a352000 x = 340 y = 390 sleep time = 692 ms id = 1
thread id : 0x70000a352000 x = 188 y = 294 sleep time = 738 ms id = 2
thread id : 0x70000a352000 x = 390 y = 978 sleep time = 270 ms id = 3
thread id : 0x70000a4db000 x = 432 y = 780 sleep time = 102 ms id = 4
thread id : 0x70000a458000 x = 652 y = 661 sleep time = 498 ms id = 5
thread id : 0x70000a3d5000 x = 839 y = 452 sleep time = 487 ms id = 6
thread id : 0x70000a352000 x = 698 y = 540 sleep time = 183 ms id = 7
thread id : 0x70000a4db000 x = 157 y = 983 sleep time = 638 ms id = 8
thread id : 0x70000a3d5000 x = 232 y = 823 sleep time = 766 ms id = 9
thread id : 0x70000a458000 x = 801 y = 411 sleep time = 314 ms id = 10
thread id : 0x70000a458000 x = 359 y = 912 sleep time = 294 ms id = 11
thread id : 0x70000a458000 x = 260 y = 142 sleep time = 372 ms id = 12
thread id : 0x70000a458000 x = 618 y = 499 sleep time = 831 ms id = 13
thread id : 0x70000a458000 x = 108 y = 319 sleep time = 376 ms id = 14
thread id : 0x70000a3d5000 x = 870 y = 490 sleep time = 519 ms id = 15
thread id : 0x70000a352000 x = 446 y = 998 sleep time = 496 ms id = 16
thread id : 0x70000a3d5000 x = 321 y = 308 sleep time = 610 ms id = 17
thread id : 0x70000a3d5000 x = 247 y = 256 sleep time = 629 ms id = 18
thread id : 0x70000a3d5000 x = 186 y = 484 sleep time = 703 ms id = 19
get result from thread index = 20
get result from thread index = 21
get result from thread index = 22
get result from thread index = 23
get result from thread index = 24
get result from thread index = 25
get result from thread index = 26
get result from thread index = 27
get result from thread index = 28
get result from thread index = 29
get result from thread index = 30
get result from thread index = 31
get result from thread index = 32
get result from thread index = 33
get result from thread index = 34
get result from thread index = 35
get result from thread index = 36
get result from thread index = 37
get result from thread index = 38
get result from thread index = 39
get result from thread index = 40
get result from thread index = 41
get result from thread index = 42
get result from thread index = 43
get result from thread index = 44
get result from thread index = 45
get result from thread index = 46
get result from thread index = 47
get result from thread index = 48
get result from thread index = 49
e
thread pool is alreay shutdown.
test finish, OY!