线程池
概念
- 线程池 <font color=red>调度机制</font>, 在实际开发中, 将每个任务都交给 <font color=red>某个线程</font>是不切实际的, 可以利用 <font color=red>并行并发</font>为任务指定单独的线程去执行
- 线程池提供了上面问题所需要的功能:
- 提交任务到任务队列上
- 工作线程从队列获取任务, 任务完成后, 再从任务队列中获取下一个任务
- 线程池几个关键的问题:
- 线程数量
- 高效的任务分配方式
- 是否阻塞(==即等待一个任务完成==)
简单的线程池(==MSVC==)
1. 数量和当前on的环境匹配
2. 挂载任务到任务队列
3. 工作线程从任务队列获取任务, 执行..., 执行完毕后再回来获取新的任务
ps: 线程池中线程不需要等待其他线程, 如果需要等待, 要进行同步管理
- 线程安全的队列(==threadsafe_queue==)
/**
.h文件, 用的时候不要导入这个
直接在main.cpp中导入hpp
*/
#pragma once
#include<mutex>
#include<queue>
namespace lb {
using namespace std;
template<typename T>
class threadsafe_queue
{
private:
mutable mutex mut;
queue<T> data_queue;
condition_variable data_cond;
public:
threadsafe_queue();
void push(const T& data);
void wait_and_pop(T& value);
shared_ptr<T> wait_and_pop();
bool try_pop(T& value);
shared_ptr<T> try_pop();
bool empty() const;
};
}
/**
hpp
*/
#include "threadsafe_queue.h"
#define _T template<typename T>
#define _P threadsafe_queue<T>
#define _TP _T _P
namespace lb {
_TP::threadsafe_queue(){}
_T void _P::push(const T& data){
lock_guard<mutex> lk(mut);
data_queue.push(move(data));
data_cond.notify_one();
}
_T void _P::wait_and_pop(T& value){
unique_lock<mutex> lk(mut);
data_cond.wait(lk, [this] {return !data_queue.empty(); });
value = move(data_queue.front());
data_queue.pop();
}
_T shared_ptr<T> _P::wait_and_pop(){
unique_lock<mutex> lk(mut);
data_cond.wait(lk, [this] {return !data_queue.empty(); }); // 4
shared_ptr<T> res(
make_shared<T>(move(data_queue.front())));
data_queue.pop();
return res;
}
_T bool _P::try_pop(T& value){
lock_guard<mutex> lk(mut);
if (data_queue.empty())
return false;
value = move(data_queue.front());
data_queue.pop();
return true;
}
_T shared_ptr<T> _P::try_pop(){
lock_guard<mutex> lk(mut);
if (data_queue.empty())
return shared_ptr<T>(); // 5
shared_ptr<T> res(
make_shared<T>(move(data_queue.front())));
data_queue.pop();
return res;
}
_T bool _P::empty() const{
lock_guard<mutex> lk(mut);
return data_queue.empty();
}
}
/**
join最合适的位置是在 main函数结束以前
*/
//.h
#pragma once
#include<vector>
#include<thread>
namespace lb {
using namespace std;
class join_threads
{
public:
explicit join_threads(vector<thread>& threads_);
~join_threads();
const vector<thread>& get() const{
return threads;
}
private:
vector<thread>& threads;
};
}
///cpp
#include "join_threads.h"
namespace lb {
join_threads::join_threads(vector<thread>& threads_):threads(threads_){}
join_threads:: ~join_threads(){
for (unsigned long i = 0; i < threads.size(); ++i){
if (threads[i].joinable())
threads[i].join();
}
}
}
//// .h
#pragma once
#include<vector>
#include<functional>
#include"join_threads.h"
#include"threadsafe_queue.hpp" //注意这里导入的是hpp, 不要导入头文件
namespace lb {
using namespace std;
class thread_pool{
public:
using _Task = function<void(void)>;
void work();
thread_pool();
const vector<thread>& get() const {
return this->threads;
}
void submit(const _Task& task) {
this->task_queue.push(task);
}
void over(void) {
finish = true;
}
private:
bool finish; //_code_a
threadsafe_queue<_Task> task_queue; //_code_b
vector<thread> threads; //_code_c
join_threads j_threads; //_code_d
/**
这里的析构顺序很重要:
_code_d要最先析构
因为join_threads的作用是join所有的子线程
这就表示了在不设置finish为true的情况下
当线程池pool死亡时, 主线程会无限等待(因为join了)
j_threads析构的时候必须访问threads中的线程, 所以
threads不能在j_threads前析构
*/
};
}
/// cpp
#include "thread_pool.h"
namespace lb {
void thread_pool::work() {
while (!finish){
_Task tmp;
if (this->task_queue.try_pop(tmp)) {
tmp();
continue;
}
this_thread::yield();
}
}
thread_pool::thread_pool() :finish(false), j_threads(threads) {
try {
// 根据当前硬件, 创建合适的线程数量
int all = thread::hardware_concurrency();
threads.reserve(all);
for (; all; --all) {
threads.emplace_back(&thread_pool::work,this);
}
}catch (std::bad_alloc) {
// 如果发生异常(thread构造失败), 设置finish为true, 已经开始的线程会停掉
finish = true;
throw;
}
}
}
#define _CRT_SECURE_NO_WARNINGS
#include<iostream>
#include<thread>
#include<future>
#include<chrono>
#include"thread_pool.h"
#include<sstream>
using namespace std;
void task_a(void) {
this_thread::sleep_for(chrono::milliseconds(2000));
thread_local ostringstream buf(std::ios_base::app);
buf.clear();
buf << this_thread::get_id() << " task_a()\n";
cout << buf.str();
}
void task_b(void) {
thread_local ostringstream buf(std::ios_base::app);
buf << this_thread::get_id();
buf << " task_b\n";
cout << buf.str();
buf.seekp(0);
}
#define version_1 1
int main(int arg, char** args) {
#ifdef version_1
lb::thread_pool p;
for (int i = -1; ++i < 10;) {
p.submit(function<void(void)>(task_a));
p.submit(function<void(void)>(task_b));
}
system("pause"); //为了看到打印
p.over();
}
简单线程池中的问题
1. 线程池中的线程在不断循环判断
2. 上述只是 一个 void(void)的函数, 而实际开发中:
用户的函数可能有返回值, 参数也是多样的
思路:
对于第1个问题:
子线程没有任务的时候应该是挂起的
当main提交任务后, 是唤醒thread_pool中挂起的线程, 做完事后再挂起
对于第2个问题分2步:
用户的函数是多种多样的:
返回值不定
参数个数不定, 参数类型不定
所以必须用到泛型
返回值势必用到 future和packaged_task
子线程改为wait
//修改threads_pool的成员变量
class thread_pool{
public:
....
private:
bool finish;
mutex empty;
condition_variable cond;
threadsafe_queue<_Task> task_queue;
vector<thread> threads;
join_threads j_threads;
};
// 修改work和submit的函数
void submit(const _Task& task) {
this->task_queue.push(task);
// 唤醒所有的线程去抢任务
cond.notify_all();
}
void thread_pool::work() {
while (!finish){
/**
当结束(finish为true 或 队列不空的时候被唤醒)
唤醒后, wait会对成员emtpy上锁(mutex对象)
但队列本身是线程安全的, 所以wait后, 再手动解锁
*/
unique_lock<mutex> u_mt(this->empty);
cond.wait(u_mt, [this] {
if(this->finish)
return true;
return !this->task_queue.empty();
});
if(finish){
u_mt.unlock();
continue;
}
u_mt.unlock();
_Task tmp;
if (this->task_queue.try_pop(tmp))
tmp();
}
}
void over(void) {
finish = true;
cond.notify_all(); //记得通知所有的线程
}
返回值的解决思路
- 因为是异步的, 所以用前面的 <font color=red>future</font>, 即 <font color=red>submit返回一个future</font>
- 先来看看, 怎么获取 <font color=red>callable</font>的返回值==类型==
/**
在以前分析 std::async的时候, STL中用到过 对callable的返回值traits
举例
*/
#include<type_traits>
#include<typeinfo>
int main(int arg, char** args){
using _Ret = _Invoke_result_t<
decay_t<decltype(&main)>,
decay_t<int>,
decay_t<char**>
>;
// traits出main函数的返回值, 所以 _Ret的类型是int
/**
ps: decltype(main) 和 decltype(&main)的类型是不一样的
decltype(&main)是 int(*)(int,char**) // 函数指针
decltype(main) 是 int(int,char**) // 像function<type>的模板参数
*/
/// decay会将 int[]的类型转换成int*(在以前也说过)
cout << typeid(decay_t<int[]>).name() << endl;
// lambda的类型
auto tmp = [](int a, char**, int[], int len) ->int {
return 0;
};
// traits出lambda的返回值, 所以_Lambda_Ret的类型是int
using _Lambda_Ret = _Invoke_result_t<
decay_t<decltype(tmp)>,
decay_t<int>,
decay_t<char**>,
decay_t<int[]>,
decay_t<int>
>;
}
tuple妙用(==任务参数的多样性==)
/**
回想thread的构造函数, 它的过程就不说了, 可以用thread的构造函数来
解决参数不定以及返回值的问题, 这里给出一个demo
是模仿thread的构造函数, 功能是将用户的 函数存储起来, 想在什么时候调用
就什么时候调用
*/
namespace _tt {
///生成的调用函数
template<typename Callable, typename... Args>
void invoke(Callable&& callable, Args&&... args) {
callable(args...);
}
/// 中间转换的函数, 外界统一从这里入口
//// 注意这里用的是 __stdcall的压栈模式
template <class _Tuple, size_t... _Indices>
static unsigned int __stdcall _Invoke(void* _RawVals) noexcept /* terminates */ {
_Tuple* _FnVals = (_Tuple*)_RawVals;
_Tuple& _Tup = *_FnVals;
printf("--- _Tup: %p\n", &_Tup);
_tt::invoke(_STD move(_STD get<_Indices>(_Tup))...);
return 0;
}
/// 根据用户传入的 callable和不定的参数, 生成具体的入口调用函数
template <class _Tuple, size_t... _Indices>
_NODISCARD static constexpr auto _Get_invoke(index_sequence<_Indices...>) noexcept {
return &_Invoke<_Tuple, _Indices...>;
}
///测试是用了2个全局的指针
/// 根据用户的callable和不定参数, 保存对应的入口地址
static void* _invoke_address = nullptr;
///参数的信息(tuple*)
static void* _args_address = nullptr;
template<typename Fir, typename... Args>
auto fun(Fir&& fir, Args&&... args) ->
future<_Invoke_result_t<decay_t<Fir>, decay_t<Args>...>> { //模仿async, 返回类型为future<int>
// 用户函数的返回类型 int
using _Ret = _Invoke_result_t<decay_t<Fir>, decay_t<Args>...>;
using _Future = future<_Ret>;
/// 将callable包装成 packaged_task
using _Packaged = packaged_task<_Ret(Args...)>;
/// 将packaged_task和用户函数的参数, 保存到tuple中
using _Tuple = tuple<_Packaged, Args...>;
/// 包装fir函数为packaged_task
_Packaged callable(fir);
/// 根据tuple(t_fun,2), 生成入口函数的签名地址, 保存到全局变量(测试)
_invoke_address = _Get_invoke<_Tuple>(make_index_sequence<1 + sizeof...(Args)>{});
/// 储存t_fun和参数信息(2)
auto p_tuple = new _Tuple(_STD forward<_Packaged>(callable), _STD forward<Args>(args)...);
//用全局变量记住, 后期threadsafe_queue中保存的就是这种类型的指针
_args_address = p_tuple;
/// 将packaged的future给外界, 外界可以get()
auto result(std::get<0>(*p_tuple).get_future());
return result;
}
}
/**
测试1, 在子线程中执行任务
*/
int t_fun(int) {
cout << "hello\n";
return 0;
}
int main(int arg, char** args) {
{
// 外界注册任务,并拿到future
auto tmp = _tt::fun(t_fun, 2);
// 模拟async中用户的任务被os调用, 这里用一个子线程, 在线程中执行上面的 t_fun
thread t_([&tmp] {
// 需要说明的是, 入口函数的签名是 unsigned int __stdcall _Invoke(void*)
//// 全局变量(存储入口函数地址)的类型是 void*, 所以必须指定和上面一致的签名
////// MSVC中默认的并不是 __stdcall, 所以要显示指定压栈方式
((unsigned int (__stdcall *)(void*))_tt::_invoke_address)(_tt::_args_address);
std::cout << this_thread::get_id << " "<< tmp.get() << endl;
});
t_.join();
return 0;
}
}
/**
测试2, 不同的用户函数(签名不一样)
*/
const char* test(const char*) {
cout << "test()\n";
return "yangrui\n";
}
int main(int arg, char** args){
auto tmp = _tt::fun(t_fun, 2);
((unsigned int (__stdcall *)(void*))_tt::_invoke_address)(_tt::_args_address);
std::cout << this_thread::get_id << " "<< tmp.get() << endl;
auto tmp2 = _tt::fun(test, 22);
((unsigned int (__stdcall *)(void*))_tt::_invoke_address)(_tt::_args_address);
std::cout << this_thread::get_id << " "<< tmp2.get() << endl;
}
/**
在上面的测试中, 有内存泄露 tuple* 没有被释放
*/
解决内存泄露(==完整的代码==)
namespace _tt {
template<typename Callable, typename... Args>
void invoke(Callable&& callable, Args&&... args) {
callable(args...);
}
template <class _Tuple, size_t... _Indices>
static unsigned int __stdcall _Invoke(void* _RawVals) noexcept /* terminates */ {
unique_ptr<_Tuple> _FnVals(static_cast<_Tuple*>(_RawVals));
_Tuple& _Tup = *_FnVals;
printf("--- _Tup: %p\n", &_Tup);
_tt::invoke(_STD move(_STD get<_Indices>(_Tup))...);
return 0;
}
template <class _Tuple, size_t... _Indices>
_NODISCARD static constexpr auto _Get_invoke(index_sequence<_Indices...>) noexcept {
return &_Invoke<_Tuple, _Indices...>;
}
static void* _invoke_address = nullptr;
static void* _args_address = nullptr;
template<typename Fir, typename... Args>
auto fun(Fir&& fir, Args&&... args) ->
future<_Invoke_result_t<decay_t<Fir>, decay_t<Args>...>> {
using _Ret = _Invoke_result_t<decay_t<Fir>, decay_t<Args>...>;
using _Future = future<_Ret>;
using _Packaged = packaged_task<_Ret(Args...)>;
using _Tuple = tuple<_Packaged, Args...>;
_Packaged callable(fir);
_invoke_address = _Get_invoke<_Tuple>(make_index_sequence<1 + sizeof...(Args)>{});
auto p_tuple = new _Tuple(_STD forward<_Packaged>(callable), _STD forward<Args>(args)...);
_args_address = p_tuple;
auto result(std::get<0>(*p_tuple).get_future());
return result;
}
}
int t_fun(int) {
cout << "hello\n";
return 2424;
}
const char* test(const char*) {
cout << "test()\n";
this_thread::sleep_for(chrono::seconds(5));
return "yangrui\n";
}
int main(int arg, char** args) {
auto tmp = _tt::fun(t_fun, 2);
((unsigned int (__stdcall *)(void*))_tt::_invoke_address)(_tt::_args_address);
//std::cout << this_thread::get_id << " "<< tmp.get() << endl;
auto tmp2 = _tt::fun(test, "lu");
((unsigned int (__stdcall *)(void*))_tt::_invoke_address)(_tt::_args_address);
std::cout << this_thread::get_id << " "<< tmp2.get() << endl;
}
在上面的static unsigned int __stdcall _Invoke(void* _RawVals) noexcept
中使用unique_ptr来包装tuple*
这里面的内存释放关乎2个new:
1. tuple*
2. 在创建任务时(fun函数内部) packaged_task内部的_Packaged_state<...> *
首先tuple*:
0. 它的作用是 存储callable(packaged_task)和用户的所有参数
1. fun函数内部被创建
2. 内部的callable间接引用着 _Pactaged_state<..> *
一个好释放理由是, 自己释放, 不要让用户手动释放
所以释放tuple*的1个设计是 在调用完 _invoke 后, 应该自动释放tuple*
方法就是在_invoke中对 tuple* 做unique, 具体见上面的代码
其次_Packaged_state<>*:
0. 它在本例中的作用是 存储用户的返回值
1. fun内部会创建 packaged_task对象, 进而创建_Packaged_state*
2. 当创建 tuple*的时候, 会将 _Packaged_state* 转移给tuple<0>
3. 当用tuple<0>获取result(future)时, result也指向_Packaged_state*
4. 当fun函数结束, 先拷贝构造临时对象tmp(future), result中的_Packaged_state*被转移到tmp
4.1 tmp如果没有外界接收, 会被析构, 进而可能会delete _Packaged_state*
但此时有2个对象引用 _Packaged_state(tuple<0>和tmp), 所以tmp只是将计数-1, 并不会释放_Packaged_state*
4.2 有外界接收, 则tmp被外界引用, 不会析构
ps: _Packaged_state的new指针是使用了引用计数原理(以前没有说过, 这里提一下), 但不是STL中的share_ptr
5. 接着释放局部的packged_task对象, 但发现_Packaged_state已经为空
所以不会释放_Packaged_state
6. main函数中调用完fun后, 借用了全局指针调用 _invoke 函数
传递的指针其实就是 tuple*
_invoke内部会拿到callable(packaged_task)调用 invoke函数
invoke的调用会触发到 callable的重载(), 并在调用结束后存储返回值到 packaged_state*指向的对象中
这个过程和以前探讨的 packgade_task源码是一样的
此时如果外界通过 packaged_task的future对象获取返回值(get())时的过程就不说了, 前面源码中已经很
详细了
7. 在 _invoke内部, 用unique<tuple> 包装了tuple*
7.1 函数完毕后, 会释放 unique<tulpe>, 进而释放tuple中的callable
callable指向的_Packgade_state*也会根据计数器决定要不要释放
8. 如果main函数中 接收第4步fun函数返回的future对象(tmp, tmp2), 则main函数结束后
会释放tmp, tmp2, 它们内部都引用_Packaged_state*, 但也会根据计数器来释放_Packaged_state*
改进前面的线程池
#pragma once
#include<mutex>
#include<queue>
namespace lb {
using namespace std;
template<typename T>
class threadsafe_queue
{
private:
mutable mutex mut;
queue<T> data_queue;
condition_variable data_cond;
public:
threadsafe_queue();
void push(const T& data);
void wait_and_pop(T& value);
shared_ptr<T> wait_and_pop();
bool try_pop(T& value);
shared_ptr<T> try_pop();
bool empty() const;
};
}
#include "threadsafe_queue.h"
#define _T template<typename T>
#define _P threadsafe_queue<T>
#define _TP _T _P
namespace lb {
_TP::threadsafe_queue(){}
_T void _P::push(const T& data){
lock_guard<mutex> lk(mut);
data_queue.push(move(data));
data_cond.notify_one();
}
_T void _P::wait_and_pop(T& value){
unique_lock<mutex> lk(mut);
data_cond.wait(lk, [this] {return !data_queue.empty(); });
value = move(data_queue.front());
data_queue.pop();
}
_T shared_ptr<T> _P::wait_and_pop(){
unique_lock<mutex> lk(mut);
data_cond.wait(lk, [this] {return !data_queue.empty(); }); // 4
shared_ptr<T> res(
make_shared<T>(move(data_queue.front())));
data_queue.pop();
return res;
}
_T bool _P::try_pop(T& value){
lock_guard<mutex> lk(mut);
if (data_queue.empty())
return false;
value = move(data_queue.front());
data_queue.pop();
return true;
}
_T shared_ptr<T> _P::try_pop(){
lock_guard<mutex> lk(mut);
if (data_queue.empty())
return shared_ptr<T>(); // 5
shared_ptr<T> res(
make_shared<T>(move(data_queue.front())));
data_queue.pop();
return res;
}
_T bool _P::empty() const{
lock_guard<mutex> lk(mut);
return data_queue.empty();
}
}
#pragma once
#include<vector>
#include<thread>
namespace lb {
using namespace std;
class join_threads
{
public:
explicit join_threads(vector<thread>& threads_);
~join_threads();
const vector<thread>& get() const{
return threads;
}
private:
vector<thread>& threads;
};
}
#include "join_threads.h"
namespace lb {
join_threads::join_threads(vector<thread>& threads_):threads(threads_){}
join_threads:: ~join_threads(){
for (unsigned long i = 0; i < threads.size(); ++i){
if (threads[i].joinable())
threads[i].join();
}
}
}
#pragma once
#include<vector>
#include<future>
#include<type_traits>
#include"join_threads.h"
#include"threadsafe_queue.hpp"
namespace lb {
using namespace std;
struct _call_info {
void* addr;
void* args;
};
template<typename Callable, typename... Args>
void invoke(Callable&& callable, Args&&... args) {
callable(args...);
}
template <class _Tuple, size_t... _Indices>
static unsigned int __stdcall _Invoke(void* _RawVals) noexcept {
unique_ptr<_Tuple> _FnVals(static_cast<_Tuple*>(_RawVals));
_Tuple& _Tup = *_FnVals;
lb::invoke(_STD move(_STD get<_Indices>(_Tup))...);
return 0;
}
template <class _Tuple, size_t... _Indices>
_NODISCARD static constexpr auto _Get_invoke(index_sequence<_Indices...>) noexcept {
return &lb::_Invoke<_Tuple, _Indices...>;
}
class thread_pool{
public:
using _Task = _call_info;
void work();
thread_pool();
template<typename Fir, typename... Args>
auto submit(Fir&& fir, Args&&... args) ->
future<_Invoke_result_t<decay_t<Fir>, decay_t<Args>...>> {
using _Ret = _Invoke_result_t<decay_t<Fir>, decay_t<Args>...>;
using _Future = future<_Ret>;
using _Packaged = packaged_task<_Ret(Args...)>;
using _Tuple = tuple<_Packaged, Args...>;
_Packaged callable(fir);
_call_info _call;
_call.addr = lb::_Get_invoke<_Tuple>(make_index_sequence<1 + sizeof...(Args)>{});
auto p_tuple = new _Tuple(_STD forward<_Packaged>(callable), _STD forward<Args>(args)...);
_call.args = p_tuple;
task_queue.push(_call);
cond.notify_all();
return std::get<0>(*p_tuple).get_future();
}
const vector<thread>& get() const {
return this->threads;
}
void over(void) {
finish = true;
cond.notify_all();
}
private:
bool finish;
mutex empty;
condition_variable cond;
threadsafe_queue<_Task> task_queue;
vector<thread> threads;
join_threads j_threads;
};
}
#include "thread_pool.h"
#include<iostream>
namespace lb {
void thread_pool::work() {
while (!finish){
unique_lock<mutex> u_mt(this->empty);
cond.wait(u_mt, [this] {
if (this->finish)
return true;
return !this->task_queue.empty();
});
if (finish) {
u_mt.unlock();
continue;
}
u_mt.unlock();
_Task tmp;
if (this->task_queue.try_pop(tmp))
((unsigned int(__stdcall*)(void*))tmp.addr)(tmp.args);
}
std::cout << "over\n";
}
thread_pool::thread_pool() :finish(false), j_threads(threads) {
try {
int all = thread::hardware_concurrency();
threads.reserve(all);
for (; all; --all) {
threads.emplace_back(&thread_pool::work,this);
}
}catch (std::bad_alloc) {
finish = true;
throw;
}
}
}```
<br>
- 测试
```cpp
#define _CRT_SECURE_NO_WARNINGS
#include<iostream>
#include<chrono>
#include"thread_pool.h"
#include<sstream>
using namespace std;
int test_a(int num) {
this_thread::sleep_for(chrono::milliseconds(2000));
thread_local ostringstream buf(std::ios_base::app);
buf << "test_a: \t";
buf << this_thread::get_id();
buf << "\targ(num):\t\t";
buf << num;
buf << "\n";
cout << buf.str();
buf.seekp(0);
return 2424;
}
const char* test_b(const char* cstr) {
thread_local ostringstream buf(std::ios_base::app);
buf << "test_b: \t";
buf << this_thread::get_id();
buf << "\targ(cstr):\t\t";
buf << cstr;
buf << "\n";
cout << buf.str();
buf.seekp(0);
return "yangrui\n";
}
int main(int arg, char** args) {
lb::thread_pool p;
{
struct _B {
int test(const char* arg, int num) {
cout << arg << endl;
cout << num << endl;
return 242;
}
};
_B b;
p.submit(&_B::test, b, "helooooo", 24242);
return 0;
}
{
p.submit([](int a, int b) {
cout << this_thread::get_id() << endl;
}, 2, 4);
getchar();
p.over();
return 0;
}
{
auto f_a = p.submit(test_a, 222);
auto f_b = p.submit(test_b, "hello");
cout << f_a.get() << endl;
cout << f_b.get() << endl;
}
for (int i = -1; ++i < 10;) {
p.submit(test_a, i* 12);
p.submit(test_b, "杨杨");
}
system("pause"); //为了看到打印
p.over();
}
修改为g++下的线程池
说明(C++17下编译)
#ifndef _JOIN_THREAD_H_
#define _JOIN_THREAD_H_
#include<vector>
#include<thread>
namespace lb {
using namespace std;
class join_threads
{
public:
explicit join_threads(vector<thread>& threads_);
~join_threads();
const vector<thread>& get() const{
return threads;
}
private:
vector<thread>& threads;
};
}
#endif
#include "join_threads.h"
namespace lb {
join_threads::join_threads(vector<thread>& threads_):threads(threads_){}
join_threads:: ~join_threads(){
for (unsigned long i = 0; i < threads.size(); ++i){
if (threads[i].joinable())
threads[i].join();
}
}
#ifndef _THREAD_POOL_H_
#define _THREAD_POOL_H_
#include<vector>
#include<future>
#include<type_traits>
#include"join_threads.h"
#include"threadsafe_queue.hpp"
#define __stdcall __attribute__((__stdcall__))
#define _STD std::
namespace lb {
using namespace std;
struct _call_info {
void* addr;
void* args;
};
template<typename Callable, typename... Args>
void invoke(Callable&& callable, Args&&... args) {
callable(args...);
}
template <class _Tuple, size_t... _Indices>
static uint32_t __stdcall _Invoke(void* _RawVals) noexcept {
unique_ptr<_Tuple> _FnVals(static_cast<_Tuple*>(_RawVals));
_Tuple& _Tup = *_FnVals;
lb::invoke(_STD move(_STD get<_Indices>(_Tup))...);
return 0;
}
template <class _Tuple, size_t... _Indices>
static constexpr auto _Get_invoke(index_sequence<_Indices...>) noexcept {
return &lb::_Invoke<_Tuple, _Indices...>;
}
class thread_pool{
public:
using _Task = _call_info;
void work();
thread_pool();
template<typename Fir, typename... Args>
auto submit(Fir&& fir, Args&&... args) ->
future<invoke_result_t<decay_t<Fir>, decay_t<Args>...>> {
using _Ret = invoke_result_t<decay_t<Fir>, decay_t<Args>...>;
using _Future = future<_Ret>;
using _Packaged = packaged_task<_Ret(Args...)>;
using _Tuple = tuple<_Packaged, Args...>;
_Packaged callable(fir);
_call_info _call;
_call.addr = (void*)lb::_Get_invoke<_Tuple>(make_index_sequence<1 + sizeof...(Args)>{});
auto p_tuple = new _Tuple(_STD forward<_Packaged>(callable), _STD forward<Args>(args)...);
_call.args = p_tuple;
task_queue.push(_call);
cond.notify_all();
return std::get<0>(*p_tuple).get_future();
}
const vector<thread>& get() const {
return this->threads;
}
void over(void) {
finish = true;
cond.notify_all();
}
private:
bool finish;
mutex empty;
condition_variable cond;
threadsafe_queue<_Task> task_queue;
vector<thread> threads;
join_threads j_threads;
};
}
#endif
#include<iostream>
namespace lb {
void thread_pool::work() {
while (!finish){
unique_lock<mutex> u_mt(this->empty);
cond.wait(u_mt, [this] {
if (this->finish)
return true;
return !this->task_queue.empty();
});
if (finish) {
u_mt.unlock();
continue;
}
u_mt.unlock();
_Task tmp;
if (this->task_queue.try_pop(tmp))
((uint32_t(__attribute__((__stdcall__))*)(void*))tmp.addr)(tmp.args);
}
std::cout << "over\n";
}
thread_pool::thread_pool() :finish(false), j_threads(threads) {
try {
int all = thread::hardware_concurrency();
threads.reserve(all);
for (; all; --all) {
threads.emplace_back(&thread_pool::work,this);
}
}catch (std::bad_alloc) {
finish = true;
throw;
}
}
}
#ifndef _THREAD_SAFE_QUEUE_H_
#define _THREAD_SAFE_QUEUE_H_
#include<mutex>
#include<queue>
namespace lb {
using namespace std;
template<typename T>
class threadsafe_queue
{
private:
mutable mutex mut;
queue<T> data_queue;
condition_variable data_cond;
public:
threadsafe_queue();
void push(const T& data);
void wait_and_pop(T& value);
shared_ptr<T> wait_and_pop();
bool try_pop(T& value);
shared_ptr<T> try_pop();
bool empty() const;
};
}
#endif
#ifndef _THREAD_SAFE_QUEUE_HPP_
#define _THREAD_SAFE_QUEUE_HPP_
#include "threadsafe_queue.h"
#define _T template<typename T>
#define _P threadsafe_queue<T>
#define _TP _T _P
namespace lb {
_TP::threadsafe_queue(){}
_T void _P::push(const T& data){
lock_guard<mutex> lk(mut);
data_queue.push(move(data));
data_cond.notify_one();
}
_T void _P::wait_and_pop(T& value){
unique_lock<mutex> lk(mut);
data_cond.wait(lk, [this] {return !data_queue.empty(); });
value = move(data_queue.front());
data_queue.pop();
}
_T shared_ptr<T> _P::wait_and_pop(){
unique_lock<mutex> lk(mut);
data_cond.wait(lk, [this] {return !data_queue.empty(); }); // 4
shared_ptr<T> res(
make_shared<T>(move(data_queue.front())));
data_queue.pop();
return res;
}
_T bool _P::try_pop(T& value){
lock_guard<mutex> lk(mut);
if (data_queue.empty())
return false;
value = move(data_queue.front());
data_queue.pop();
return true;
}
_T shared_ptr<T> _P::try_pop(){
lock_guard<mutex> lk(mut);
if (data_queue.empty())
return shared_ptr<T>(); // 5
shared_ptr<T> res(
make_shared<T>(move(data_queue.front())));
data_queue.pop();
return res;
}
_T bool _P::empty() const{
lock_guard<mutex> lk(mut);
return data_queue.empty();
}
}
#endif
#include<iostream>
#include<chrono>
#include"thread_pool.h"
#include<sstream>
using namespace std;
int test_a(int num) {
this_thread::sleep_for(chrono::milliseconds(2000));
thread_local ostringstream buf(std::ios_base::app);
buf << "test_a: \t";
buf << this_thread::get_id();
buf << "\targ(num):\t\t";
buf << num;
buf << "\n";
cout << buf.str();
buf.seekp(0);
return 2424;
}
const char* test_b(const char* cstr) {
thread_local ostringstream buf(std::ios_base::app);
buf << "test_b: \t";
buf << this_thread::get_id();
buf << "\targ(cstr):\t\t";
buf << cstr;
buf << "\n";
cout << buf.str();
buf.seekp(0);
return "yangrui\n";
}
int main(int arg, char** args) {
lb::thread_pool p;
#if 0
{
struct _B {
int test(const char* arg, int num) {
cout << arg << endl;
cout << num << endl;
return 242;
}
};
_B b;
p.submit(&_B::test, b, "helooooo", 24242);
return 0;
}
#elif 0
{
p.submit([](int a, int b) {
cout << "a: " << a << endl;
cout << "b: " << b << endl;
cout << "td: " << this_thread::get_id() << endl;
}, 2, 4);
getchar();
p.over();
return 0;
}
#elif 1
{
auto f_a = p.submit(test_a, 222);
auto f_b = p.submit(test_b, "hello");
cout << f_a.get() << endl;
cout << f_b.get() << endl;
}
for (int i = -1; ++i < 10;) {
p.submit(test_a, i* 12);
p.submit(test_b, "杨杨");
}
getchar();
p.over();
#endif
return 0;
}
编译
g++ -ggdb3 main.cpp join_threads.cpp thread_pool.cpp -lpthread -std=c++17