c++多线程04

线程池

概念

  • 线程池 <font color=red>调度机制</font>, 在实际开发中, 将每个任务都交给 <font color=red>某个线程</font>是不切实际的, 可以利用 <font color=red>并行并发</font>为任务指定单独的线程去执行


  • 线程池提供了上面问题所需要的功能:
    • 提交任务到任务队列上
    • 工作线程从队列获取任务, 任务完成后, 再从任务队列中获取下一个任务


  • 线程池几个关键的问题:
    • 线程数量
    • 高效的任务分配方式
    • 是否阻塞(==即等待一个任务完成==)


简单的线程池(==MSVC==)

  • 需求
1. 数量和当前on的环境匹配
2. 挂载任务到任务队列
3. 工作线程从任务队列获取任务, 执行..., 执行完毕后再回来获取新的任务
ps: 线程池中线程不需要等待其他线程, 如果需要等待, 要进行同步管理


  • demo结构
    19.png


  • 线程安全的队列(==threadsafe_queue==)
/** 
    .h文件, 用的时候不要导入这个
    直接在main.cpp中导入hpp
*/
#pragma once
#include<mutex>
#include<queue>
namespace lb {
    using namespace std;

template<typename T>
class threadsafe_queue
{
private:
    mutable mutex mut;
    queue<T> data_queue;
    condition_variable data_cond;

public:
    threadsafe_queue();

    void push(const T& data);

    void wait_and_pop(T& value);

    shared_ptr<T> wait_and_pop();

    bool try_pop(T& value);

    shared_ptr<T> try_pop();

    bool empty() const;
};

}





/** 
    hpp
*/
#include "threadsafe_queue.h"

#define _T template<typename T>
#define _P threadsafe_queue<T>
#define _TP _T _P

namespace lb {

_TP::threadsafe_queue(){}

_T void _P::push(const T& data){
    lock_guard<mutex> lk(mut);
    data_queue.push(move(data));
    data_cond.notify_one();  
}

_T void _P::wait_and_pop(T& value){
    unique_lock<mutex> lk(mut);
    data_cond.wait(lk, [this] {return !data_queue.empty(); });
    value = move(data_queue.front());
    data_queue.pop();
}

_T shared_ptr<T> _P::wait_and_pop(){
    unique_lock<mutex> lk(mut);
    data_cond.wait(lk, [this] {return !data_queue.empty(); });  // 4
    shared_ptr<T> res(
        make_shared<T>(move(data_queue.front())));
    data_queue.pop();
    return res;
}

_T bool _P::try_pop(T& value){
    lock_guard<mutex> lk(mut);
    if (data_queue.empty())
        return false;
    value = move(data_queue.front());
    data_queue.pop();
    return true;
}

_T shared_ptr<T> _P::try_pop(){
    lock_guard<mutex> lk(mut);
    if (data_queue.empty())
        return shared_ptr<T>();  // 5
    shared_ptr<T> res(
        make_shared<T>(move(data_queue.front())));
    data_queue.pop();
    return res;
}

_T bool _P::empty() const{
    lock_guard<mutex> lk(mut);
    return data_queue.empty();
}


}


  • 辅助类(==join_threads==)
/** 
    join最合适的位置是在 main函数结束以前
*/

//.h
#pragma once
#include<vector>
#include<thread>

namespace lb {

using namespace std;

class join_threads
{
public:
     explicit join_threads(vector<thread>& threads_);
     ~join_threads();
    
     const vector<thread>& get() const{
         return threads;
     }
private:
    vector<thread>& threads;
};

}



///cpp
#include "join_threads.h"

namespace lb {

join_threads::join_threads(vector<thread>& threads_):threads(threads_){}

join_threads:: ~join_threads(){
    for (unsigned long i = 0; i < threads.size(); ++i){
        if (threads[i].joinable())
            threads[i].join();
    }
}

}


  • 线程池的实现
//// .h
#pragma once

#include<vector>
#include<functional>

#include"join_threads.h"
#include"threadsafe_queue.hpp"      //注意这里导入的是hpp, 不要导入头文件

namespace lb {
    using namespace std;

class thread_pool{
public:
    using _Task = function<void(void)>;
    
    
    void work();

    thread_pool();

    const vector<thread>& get() const {
        return this->threads;
    }

    void submit(const _Task& task) {
        this->task_queue.push(task);
    }

    void over(void) {
        finish = true;
    }
private:
    bool finish;                            //_code_a
    threadsafe_queue<_Task> task_queue;     //_code_b
    vector<thread> threads;                 //_code_c
    join_threads j_threads;                 //_code_d

    /** 
        这里的析构顺序很重要:
            _code_d要最先析构

        因为join_threads的作用是join所有的子线程
        这就表示了在不设置finish为true的情况下
        当线程池pool死亡时, 主线程会无限等待(因为join了)
        
        j_threads析构的时候必须访问threads中的线程, 所以
        threads不能在j_threads前析构
    */
};
}






/// cpp
#include "thread_pool.h"
namespace lb {
    void thread_pool::work() {

        while (!finish){
            _Task tmp;
            if (this->task_queue.try_pop(tmp)) {
                tmp();
                continue;
            }
            this_thread::yield();
        }
    }

    thread_pool::thread_pool() :finish(false), j_threads(threads) {
        try {
            // 根据当前硬件, 创建合适的线程数量
            int all = thread::hardware_concurrency();
            
            threads.reserve(all);

            for (; all; --all) {
                threads.emplace_back(&thread_pool::work,this);
            }
        }catch (std::bad_alloc) {
            // 如果发生异常(thread构造失败), 设置finish为true, 已经开始的线程会停掉
            finish = true;
            throw;
        }
    }
}


  • 测试代码
#define _CRT_SECURE_NO_WARNINGS

#include<iostream>
#include<thread>
#include<future>
#include<chrono>
#include"thread_pool.h"
#include<sstream>
using namespace std;


void task_a(void) {
    this_thread::sleep_for(chrono::milliseconds(2000));
    thread_local ostringstream buf(std::ios_base::app);
    buf.clear();
    buf << this_thread::get_id() << " task_a()\n";
    cout << buf.str();
}

void task_b(void) {
    thread_local ostringstream buf(std::ios_base::app);
    buf << this_thread::get_id();
    buf << " task_b\n";
    cout << buf.str();
    buf.seekp(0);
}


#define version_1 1
int main(int arg, char** args) {

#ifdef version_1

    lb::thread_pool p;

    for (int i = -1; ++i < 10;) {
        p.submit(function<void(void)>(task_a));
        p.submit(function<void(void)>(task_b));
    }

    system("pause");    //为了看到打印
    p.over();             
}


简单线程池中的问题

1. 线程池中的线程在不断循环判断

2. 上述只是 一个 void(void)的函数, 而实际开发中:
    用户的函数可能有返回值, 参数也是多样的


思路:
    对于第1个问题:
        子线程没有任务的时候应该是挂起的
        当main提交任务后, 是唤醒thread_pool中挂起的线程, 做完事后再挂起

    对于第2个问题分2步:
        用户的函数是多种多样的:
            返回值不定
            参数个数不定, 参数类型不定
        
        所以必须用到泛型
        返回值势必用到 future和packaged_task


子线程改为wait

//修改threads_pool的成员变量
class thread_pool{
public:

    ....


private:
    bool finish;
    mutex empty;
    condition_variable cond;
    threadsafe_queue<_Task> task_queue;
    vector<thread> threads;
    join_threads j_threads;
};


// 修改work和submit的函数
void submit(const _Task& task) {
    this->task_queue.push(task);

    // 唤醒所有的线程去抢任务
    cond.notify_all();
}


void thread_pool::work() {
    while (!finish){
        /** 
            当结束(finish为true 或 队列不空的时候被唤醒)
            唤醒后, wait会对成员emtpy上锁(mutex对象)
            但队列本身是线程安全的, 所以wait后, 再手动解锁
        */
        unique_lock<mutex> u_mt(this->empty);
        cond.wait(u_mt, [this] {
            if(this->finish)
                return true;
            return !this->task_queue.empty();
        });

        if(finish){
            u_mt.unlock();
            continue;
        }
        u_mt.unlock();

        _Task tmp;
        
        if (this->task_queue.try_pop(tmp)) 
            tmp();
    }
}



void over(void) {
    finish = true;
    cond.notify_all();      //记得通知所有的线程
}


返回值的解决思路

  • 因为是异步的, 所以用前面的 <font color=red>future</font>, 即 <font color=red>submit返回一个future</font>


  • 先来看看, 怎么获取 <font color=red>callable</font>的返回值==类型==
/** 
    在以前分析 std::async的时候, STL中用到过 对callable的返回值traits
    
    举例
*/
#include<type_traits>
#include<typeinfo>


int main(int arg, char** args){
    using _Ret = _Invoke_result_t<
        decay_t<decltype(&main)>, 
        decay_t<int>, 
        decay_t<char**>
        >;

    
    // traits出main函数的返回值, 所以 _Ret的类型是int
    /** 
        ps: decltype(main) 和 decltype(&main)的类型是不一样的
            decltype(&main)是 int(*)(int,char**)    // 函数指针
            decltype(main) 是 int(int,char**)       // 像function<type>的模板参数
    */


    /// decay会将 int[]的类型转换成int*(在以前也说过)
    cout << typeid(decay_t<int[]>).name() << endl;


    // lambda的类型
    auto tmp = [](int a, char**, int[], int len) ->int {
            return 0;
    };

    // traits出lambda的返回值, 所以_Lambda_Ret的类型是int
    using _Lambda_Ret = _Invoke_result_t< 
        decay_t<decltype(tmp)>,
        decay_t<int>,
        decay_t<char**>,
        decay_t<int[]>,
        decay_t<int>
        >;
}


tuple妙用(==任务参数的多样性==)

/** 
    回想thread的构造函数, 它的过程就不说了, 可以用thread的构造函数来
    解决参数不定以及返回值的问题, 这里给出一个demo
    是模仿thread的构造函数, 功能是将用户的 函数存储起来, 想在什么时候调用
    就什么时候调用
*/
namespace _tt {

    ///生成的调用函数
    template<typename Callable, typename... Args>
    void invoke(Callable&& callable, Args&&... args) {
        callable(args...);
    }


    
    /// 中间转换的函数, 外界统一从这里入口
    //// 注意这里用的是 __stdcall的压栈模式
    template <class _Tuple, size_t... _Indices>
    static unsigned int __stdcall _Invoke(void* _RawVals) noexcept /* terminates */ {
        _Tuple* _FnVals = (_Tuple*)_RawVals;
        _Tuple& _Tup = *_FnVals;
        printf("--- _Tup: %p\n", &_Tup);
        _tt::invoke(_STD move(_STD get<_Indices>(_Tup))...);
        return 0;
    }


    /// 根据用户传入的 callable和不定的参数, 生成具体的入口调用函数
    template <class _Tuple, size_t... _Indices>
    _NODISCARD static constexpr auto _Get_invoke(index_sequence<_Indices...>) noexcept {
        return &_Invoke<_Tuple, _Indices...>;
    }


    
    ///测试是用了2个全局的指针
    /// 根据用户的callable和不定参数, 保存对应的入口地址
    static void* _invoke_address = nullptr;

    ///参数的信息(tuple*)
    static void* _args_address = nullptr;

    
    template<typename Fir, typename... Args>
    auto fun(Fir&& fir, Args&&... args) ->
    future<_Invoke_result_t<decay_t<Fir>, decay_t<Args>...>> { //模仿async, 返回类型为future<int>

        // 用户函数的返回类型 int
        using _Ret = _Invoke_result_t<decay_t<Fir>, decay_t<Args>...>;
        using _Future = future<_Ret>;

        /// 将callable包装成 packaged_task
        using _Packaged = packaged_task<_Ret(Args...)>;

        /// 将packaged_task和用户函数的参数, 保存到tuple中
        using _Tuple = tuple<_Packaged, Args...>;

        /// 包装fir函数为packaged_task
        _Packaged callable(fir);

        /// 根据tuple(t_fun,2), 生成入口函数的签名地址, 保存到全局变量(测试)
        _invoke_address = _Get_invoke<_Tuple>(make_index_sequence<1 + sizeof...(Args)>{});

        /// 储存t_fun和参数信息(2)
        auto p_tuple = new _Tuple(_STD forward<_Packaged>(callable), _STD forward<Args>(args)...);

        //用全局变量记住, 后期threadsafe_queue中保存的就是这种类型的指针
        _args_address = p_tuple;


        /// 将packaged的future给外界, 外界可以get()
        auto result(std::get<0>(*p_tuple).get_future());
        return result;
    }
}





/** 
    测试1, 在子线程中执行任务
*/
int t_fun(int) {
    cout << "hello\n";
    return 0;
}
int main(int arg, char** args) {
    {
        // 外界注册任务,并拿到future
        auto tmp = _tt::fun(t_fun, 2);

        // 模拟async中用户的任务被os调用, 这里用一个子线程, 在线程中执行上面的 t_fun
        thread t_([&tmp] {

            // 需要说明的是, 入口函数的签名是 unsigned int __stdcall _Invoke(void*)
            //// 全局变量(存储入口函数地址)的类型是 void*, 所以必须指定和上面一致的签名
            ////// MSVC中默认的并不是 __stdcall, 所以要显示指定压栈方式
            ((unsigned int (__stdcall *)(void*))_tt::_invoke_address)(_tt::_args_address);
            std::cout << this_thread::get_id  << " "<< tmp.get() << endl;
        });

        t_.join();

        return 0;
    }
}    



/** 
    测试2, 不同的用户函数(签名不一样)
*/
const char* test(const char*) {
    cout << "test()\n";
    return "yangrui\n";
}

int main(int arg, char** args){
    auto tmp = _tt::fun(t_fun, 2);

    ((unsigned int (__stdcall *)(void*))_tt::_invoke_address)(_tt::_args_address);
    std::cout << this_thread::get_id  << " "<< tmp.get() << endl;


    auto tmp2 = _tt::fun(test, 22);

    ((unsigned int (__stdcall *)(void*))_tt::_invoke_address)(_tt::_args_address);
    std::cout << this_thread::get_id  << " "<< tmp2.get() << endl;
}



/** 
    在上面的测试中, 有内存泄露 tuple* 没有被释放
*/


解决内存泄露(==完整的代码==)

namespace _tt {

    template<typename Callable, typename... Args>
    void invoke(Callable&& callable, Args&&... args) {
        callable(args...);
    }

    template <class _Tuple, size_t... _Indices>
    static unsigned int __stdcall _Invoke(void* _RawVals) noexcept /* terminates */ {
        unique_ptr<_Tuple> _FnVals(static_cast<_Tuple*>(_RawVals));
        _Tuple& _Tup = *_FnVals;
        printf("--- _Tup: %p\n", &_Tup);
        _tt::invoke(_STD move(_STD get<_Indices>(_Tup))...);
        return 0;
    }

    template <class _Tuple, size_t... _Indices>
    _NODISCARD static constexpr auto _Get_invoke(index_sequence<_Indices...>) noexcept {
        return &_Invoke<_Tuple, _Indices...>;
    }


    static void* _invoke_address = nullptr;
    static void* _args_address = nullptr;


    template<typename Fir, typename... Args>
    auto fun(Fir&& fir, Args&&... args) ->
    future<_Invoke_result_t<decay_t<Fir>, decay_t<Args>...>> {
        using _Ret = _Invoke_result_t<decay_t<Fir>, decay_t<Args>...>;
        using _Future = future<_Ret>;

        using _Packaged = packaged_task<_Ret(Args...)>;
        using _Tuple = tuple<_Packaged, Args...>;

        _Packaged callable(fir);

        _invoke_address = _Get_invoke<_Tuple>(make_index_sequence<1 + sizeof...(Args)>{});

        auto p_tuple = new _Tuple(_STD forward<_Packaged>(callable), _STD forward<Args>(args)...);
        _args_address = p_tuple;



        auto result(std::get<0>(*p_tuple).get_future());
        return result;
    }
}






int t_fun(int) {
    cout << "hello\n";
    return 2424;
}



const char* test(const char*) {
    cout << "test()\n";
    this_thread::sleep_for(chrono::seconds(5));
    return "yangrui\n";
}

int main(int arg, char** args) {
    auto tmp = _tt::fun(t_fun, 2);

    ((unsigned int (__stdcall *)(void*))_tt::_invoke_address)(_tt::_args_address);
    //std::cout << this_thread::get_id  << " "<< tmp.get() << endl;


    auto tmp2 = _tt::fun(test, "lu");

    ((unsigned int (__stdcall *)(void*))_tt::_invoke_address)(_tt::_args_address);
    std::cout << this_thread::get_id  << " "<< tmp2.get() << endl;
}
在上面的static unsigned int __stdcall _Invoke(void* _RawVals) noexcept 
中使用unique_ptr来包装tuple*

这里面的内存释放关乎2个new:
    1. tuple*
    2. 在创建任务时(fun函数内部) packaged_task内部的_Packaged_state<...> *

首先tuple*:
    0. 它的作用是 存储callable(packaged_task)和用户的所有参数

    1. fun函数内部被创建

    2. 内部的callable间接引用着 _Pactaged_state<..> *
    
    一个好释放理由是, 自己释放, 不要让用户手动释放 
    所以释放tuple*的1个设计是 在调用完 _invoke 后, 应该自动释放tuple*
    方法就是在_invoke中对 tuple* 做unique, 具体见上面的代码


其次_Packaged_state<>*:
    0. 它在本例中的作用是 存储用户的返回值
    
    1. fun内部会创建 packaged_task对象, 进而创建_Packaged_state*

    2. 当创建 tuple*的时候, 会将 _Packaged_state* 转移给tuple<0>

    3. 当用tuple<0>获取result(future)时, result也指向_Packaged_state*

    4. 当fun函数结束, 先拷贝构造临时对象tmp(future), result中的_Packaged_state*被转移到tmp
        4.1 tmp如果没有外界接收, 会被析构, 进而可能会delete _Packaged_state*
            但此时有2个对象引用 _Packaged_state(tuple<0>和tmp), 所以tmp只是将计数-1, 并不会释放_Packaged_state*

        4.2 有外界接收, 则tmp被外界引用, 不会析构

      ps: _Packaged_state的new指针是使用了引用计数原理(以前没有说过, 这里提一下), 但不是STL中的share_ptr
         

    5. 接着释放局部的packged_task对象, 但发现_Packaged_state已经为空
        所以不会释放_Packaged_state


    6. main函数中调用完fun后, 借用了全局指针调用 _invoke 函数
        传递的指针其实就是 tuple*
        _invoke内部会拿到callable(packaged_task)调用 invoke函数
        invoke的调用会触发到 callable的重载(), 并在调用结束后存储返回值到 packaged_state*指向的对象中
        这个过程和以前探讨的 packgade_task源码是一样的
        此时如果外界通过 packaged_task的future对象获取返回值(get())时的过程就不说了, 前面源码中已经很
        详细了
    
    7. 在 _invoke内部, 用unique<tuple> 包装了tuple*
        7.1 函数完毕后, 会释放 unique<tulpe>, 进而释放tuple中的callable
            callable指向的_Packgade_state*也会根据计数器决定要不要释放

    8. 如果main函数中 接收第4步fun函数返回的future对象(tmp, tmp2), 则main函数结束后
        会释放tmp, tmp2, 它们内部都引用_Packaged_state*, 但也会根据计数器来释放_Packaged_state*


改进前面的线程池

  • threadsafe_queue
#pragma once
#include<mutex>
#include<queue>
namespace lb {
    using namespace std;

template<typename T>
class threadsafe_queue
{
private:
    mutable mutex mut;
    queue<T> data_queue;
    condition_variable data_cond;

public:
    threadsafe_queue();

    void push(const T& data);

    void wait_and_pop(T& value);

    shared_ptr<T> wait_and_pop();

    bool try_pop(T& value);

    shared_ptr<T> try_pop();

    bool empty() const;
};

}





#include "threadsafe_queue.h"

#define _T template<typename T>
#define _P threadsafe_queue<T>
#define _TP _T _P

namespace lb {
_TP::threadsafe_queue(){}

_T void _P::push(const T& data){
    lock_guard<mutex> lk(mut);
    data_queue.push(move(data));
    data_cond.notify_one();  
}

_T void _P::wait_and_pop(T& value){
    unique_lock<mutex> lk(mut);
    data_cond.wait(lk, [this] {return !data_queue.empty(); });
    value = move(data_queue.front());
    data_queue.pop();
}

_T shared_ptr<T> _P::wait_and_pop(){
    unique_lock<mutex> lk(mut);
    data_cond.wait(lk, [this] {return !data_queue.empty(); });  // 4
    shared_ptr<T> res(
        make_shared<T>(move(data_queue.front())));
    data_queue.pop();
    return res;
}

_T bool _P::try_pop(T& value){
    lock_guard<mutex> lk(mut);
    if (data_queue.empty())
        return false;
    value = move(data_queue.front());
    data_queue.pop();
    return true;
}

_T shared_ptr<T> _P::try_pop(){
    lock_guard<mutex> lk(mut);
    if (data_queue.empty())
        return shared_ptr<T>();  // 5
    shared_ptr<T> res(
        make_shared<T>(move(data_queue.front())));
    data_queue.pop();
    return res;
}

_T bool _P::empty() const{
    lock_guard<mutex> lk(mut);
    return data_queue.empty();
}


}


  • join_thread
#pragma once
#include<vector>
#include<thread>

namespace lb {

using namespace std;

class join_threads
{
public:
     explicit join_threads(vector<thread>& threads_);
     ~join_threads();
    
     const vector<thread>& get() const{
         return threads;
     }
private:
    vector<thread>& threads;
};

}



#include "join_threads.h"

namespace lb {

join_threads::join_threads(vector<thread>& threads_):threads(threads_){}

join_threads:: ~join_threads(){
    for (unsigned long i = 0; i < threads.size(); ++i){
        if (threads[i].joinable())
            threads[i].join();
    }
}

}


  • thread_pool
#pragma once

#include<vector>
#include<future>
#include<type_traits>

#include"join_threads.h"
#include"threadsafe_queue.hpp"

namespace lb {

using namespace std;

struct _call_info {
    void* addr;
    void* args;
};



template<typename Callable, typename... Args>
void invoke(Callable&& callable, Args&&... args) {
    callable(args...);
}

template <class _Tuple, size_t... _Indices>
static unsigned int __stdcall _Invoke(void* _RawVals) noexcept {
    unique_ptr<_Tuple> _FnVals(static_cast<_Tuple*>(_RawVals));

    _Tuple& _Tup = *_FnVals;

    lb::invoke(_STD move(_STD get<_Indices>(_Tup))...);

    return 0;
}

template <class _Tuple, size_t... _Indices>
_NODISCARD static constexpr auto _Get_invoke(index_sequence<_Indices...>) noexcept {
    return &lb::_Invoke<_Tuple, _Indices...>;
}










class thread_pool{
public:
    using _Task = _call_info;
    
    void work();
    thread_pool();


    template<typename Fir, typename... Args>
    auto submit(Fir&& fir, Args&&... args) ->
        future<_Invoke_result_t<decay_t<Fir>, decay_t<Args>...>> {
        using _Ret = _Invoke_result_t<decay_t<Fir>, decay_t<Args>...>;
        using _Future = future<_Ret>;

        using _Packaged = packaged_task<_Ret(Args...)>;
        using _Tuple = tuple<_Packaged, Args...>;

        _Packaged callable(fir);

        _call_info _call;

        _call.addr = lb::_Get_invoke<_Tuple>(make_index_sequence<1 + sizeof...(Args)>{});

        auto p_tuple = new _Tuple(_STD forward<_Packaged>(callable), _STD forward<Args>(args)...);
        _call.args = p_tuple;

        task_queue.push(_call);
        cond.notify_all();

        return std::get<0>(*p_tuple).get_future();
    }


    const vector<thread>& get() const {
        return this->threads;
    }

    void over(void) {
        finish = true;
        cond.notify_all();
    }
private:
    bool finish;
    mutex empty;
    condition_variable cond;
    threadsafe_queue<_Task> task_queue;
    vector<thread> threads;
    join_threads j_threads;
};



}






#include "thread_pool.h"
#include<iostream>

namespace lb {
    


    void thread_pool::work() {

        while (!finish){
            unique_lock<mutex> u_mt(this->empty);
            cond.wait(u_mt, [this] {
                if (this->finish)
                    return true;
                return !this->task_queue.empty();
            });
            if (finish) {
                u_mt.unlock();
                continue;
            }
            u_mt.unlock();

            _Task tmp;
            
            if (this->task_queue.try_pop(tmp)) 
                ((unsigned int(__stdcall*)(void*))tmp.addr)(tmp.args);
        }
        std::cout << "over\n";
    }

    thread_pool::thread_pool() :finish(false), j_threads(threads) {
        try {
            int all = thread::hardware_concurrency();
            threads.reserve(all);

            for (; all; --all) {
                threads.emplace_back(&thread_pool::work,this);
            }
        }catch (std::bad_alloc) {
            finish = true;
            throw;
        }
    }
}```

<br>

- 测试
```cpp
#define _CRT_SECURE_NO_WARNINGS

#include<iostream>
#include<chrono>
#include"thread_pool.h"
#include<sstream>
using namespace std;

int test_a(int num) {
    this_thread::sleep_for(chrono::milliseconds(2000));
    thread_local ostringstream buf(std::ios_base::app);

    buf << "test_a: \t";
    buf << this_thread::get_id();
    buf << "\targ(num):\t\t";
    buf << num;
    buf << "\n";
    cout << buf.str();
    buf.seekp(0);

    return 2424;
}

const char* test_b(const char* cstr) {
    thread_local ostringstream buf(std::ios_base::app);

    buf << "test_b: \t";
    buf << this_thread::get_id();
    buf << "\targ(cstr):\t\t";
    buf << cstr;
    buf << "\n";
    cout << buf.str();
    buf.seekp(0);

    return "yangrui\n";
}

int main(int arg, char** args) {
    lb::thread_pool p;
    {
        struct _B {
            int test(const char* arg, int num) {
                cout << arg << endl;
                cout << num << endl;
                return 242;
            }

        };
        _B b;
        p.submit(&_B::test, b, "helooooo", 24242);
        return 0;
    }

    {
    
        p.submit([](int a, int b) {
            cout << this_thread::get_id() << endl;
        }, 2, 4);
        
        getchar();

        p.over();
        return 0;
    }

    { 
        auto f_a = p.submit(test_a, 222);
        auto f_b = p.submit(test_b, "hello");

        cout << f_a.get() << endl;
        cout << f_b.get() << endl;
    }

    for (int i = -1; ++i < 10;) {
        p.submit(test_a, i* 12);
        p.submit(test_b, "杨杨");
    }

    system("pause");    //为了看到打印
    p.over();
}


修改为g++下的线程池

说明(C++17下编译)

#ifndef _JOIN_THREAD_H_
#define _JOIN_THREAD_H_
#include<vector>
#include<thread>

namespace lb {

using namespace std;

class join_threads
{
public:
         explicit join_threads(vector<thread>& threads_);
         ~join_threads();

         const vector<thread>& get() const{
                 return threads;
         }
private:
        vector<thread>& threads;
};

}

#endif




#include "join_threads.h"

namespace lb {

join_threads::join_threads(vector<thread>& threads_):threads(threads_){}

join_threads:: ~join_threads(){
        for (unsigned long i = 0; i < threads.size(); ++i){
                if (threads[i].joinable())
                        threads[i].join();
        }
}















#ifndef _THREAD_POOL_H_
#define _THREAD_POOL_H_
#include<vector>
#include<future>
#include<type_traits>

#include"join_threads.h"
#include"threadsafe_queue.hpp"

#define __stdcall __attribute__((__stdcall__))
#define _STD std::


namespace lb {

using namespace std;

struct _call_info {
        void* addr;
        void* args;
};



template<typename Callable, typename... Args>
void invoke(Callable&& callable, Args&&... args) {
        callable(args...);
}

template <class _Tuple, size_t... _Indices>
static uint32_t __stdcall _Invoke(void* _RawVals) noexcept {
        unique_ptr<_Tuple> _FnVals(static_cast<_Tuple*>(_RawVals));

        _Tuple& _Tup = *_FnVals;

        lb::invoke(_STD move(_STD get<_Indices>(_Tup))...);

        return 0;
}

template <class _Tuple, size_t... _Indices>
static constexpr auto _Get_invoke(index_sequence<_Indices...>) noexcept {
        return &lb::_Invoke<_Tuple, _Indices...>;
}










class thread_pool{
public:
        using _Task = _call_info;

        void work();
        thread_pool();


        template<typename Fir, typename... Args>
        auto submit(Fir&& fir, Args&&... args) ->
                future<invoke_result_t<decay_t<Fir>, decay_t<Args>...>> {
                using _Ret = invoke_result_t<decay_t<Fir>, decay_t<Args>...>;
                using _Future = future<_Ret>;

                using _Packaged = packaged_task<_Ret(Args...)>;
                using _Tuple = tuple<_Packaged, Args...>;

                _Packaged callable(fir);

                _call_info _call;

                _call.addr = (void*)lb::_Get_invoke<_Tuple>(make_index_sequence<1 + sizeof...(Args)>{});

                auto p_tuple = new _Tuple(_STD forward<_Packaged>(callable), _STD forward<Args>(args)...);
                _call.args = p_tuple;

                task_queue.push(_call);
                cond.notify_all();

                return std::get<0>(*p_tuple).get_future();
        }


        const vector<thread>& get() const {
                return this->threads;
        }

        void over(void) {
                finish = true;
                cond.notify_all();
        }
private:
        bool finish;
        mutex empty;
        condition_variable cond;
        threadsafe_queue<_Task> task_queue;
        vector<thread> threads;
        join_threads j_threads;
};



}

#endif




#include<iostream>

namespace lb {



        void thread_pool::work() {

                while (!finish){
                        unique_lock<mutex> u_mt(this->empty);
                        cond.wait(u_mt, [this] {
                                if (this->finish)
                                        return true;
                                return !this->task_queue.empty();
                        });
                        if (finish) {
                                u_mt.unlock();
                                continue;
                        }
                        u_mt.unlock();

                        _Task tmp;

                        if (this->task_queue.try_pop(tmp)) 
                                ((uint32_t(__attribute__((__stdcall__))*)(void*))tmp.addr)(tmp.args);
                }
                std::cout << "over\n";
        }

        thread_pool::thread_pool() :finish(false), j_threads(threads) {
                try {
                        int all = thread::hardware_concurrency();
                        threads.reserve(all);

                        for (; all; --all) {
                                threads.emplace_back(&thread_pool::work,this);
                        }
                }catch (std::bad_alloc) {
                        finish = true;
                        throw;
                }
        }
}













#ifndef _THREAD_SAFE_QUEUE_H_
#define _THREAD_SAFE_QUEUE_H_
#include<mutex>
#include<queue>
namespace lb {
        using namespace std;

template<typename T>
class threadsafe_queue
{
private:
        mutable mutex mut;
        queue<T> data_queue;
        condition_variable data_cond;

public:
        threadsafe_queue();

        void push(const T& data);

        void wait_and_pop(T& value);

        shared_ptr<T> wait_and_pop();

        bool try_pop(T& value);

        shared_ptr<T> try_pop();

        bool empty() const;
};

}

#endif



#ifndef _THREAD_SAFE_QUEUE_HPP_
#define _THREAD_SAFE_QUEUE_HPP_
#include "threadsafe_queue.h"

#define _T template<typename T>
#define _P threadsafe_queue<T>
#define _TP _T _P

namespace lb {
_TP::threadsafe_queue(){}

_T void _P::push(const T& data){
        lock_guard<mutex> lk(mut);
        data_queue.push(move(data));
        data_cond.notify_one();  
}

_T void _P::wait_and_pop(T& value){
        unique_lock<mutex> lk(mut);
        data_cond.wait(lk, [this] {return !data_queue.empty(); });
        value = move(data_queue.front());
        data_queue.pop();
}

_T shared_ptr<T> _P::wait_and_pop(){
        unique_lock<mutex> lk(mut);
        data_cond.wait(lk, [this] {return !data_queue.empty(); });  // 4
        shared_ptr<T> res(
                make_shared<T>(move(data_queue.front())));
        data_queue.pop();
        return res;
}

_T bool _P::try_pop(T& value){
        lock_guard<mutex> lk(mut);
        if (data_queue.empty())
                return false;
        value = move(data_queue.front());
        data_queue.pop();
        return true;
}

_T shared_ptr<T> _P::try_pop(){
        lock_guard<mutex> lk(mut);
        if (data_queue.empty())
                return shared_ptr<T>();  // 5
        shared_ptr<T> res(
                make_shared<T>(move(data_queue.front())));
        data_queue.pop();
        return res;
}

_T bool _P::empty() const{
        lock_guard<mutex> lk(mut);
        return data_queue.empty();
}


}

#endif








#include<iostream>
#include<chrono>
#include"thread_pool.h"
#include<sstream>
using namespace std;

int test_a(int num) {
        this_thread::sleep_for(chrono::milliseconds(2000));
        thread_local ostringstream buf(std::ios_base::app);

        buf << "test_a: \t";
        buf << this_thread::get_id();
        buf << "\targ(num):\t\t";
        buf << num;
        buf << "\n";
        cout << buf.str();
        buf.seekp(0);

        return 2424;
}

const char* test_b(const char* cstr) {
        thread_local ostringstream buf(std::ios_base::app);

        buf << "test_b: \t";
        buf << this_thread::get_id();
        buf << "\targ(cstr):\t\t";
        buf << cstr;
        buf << "\n";
        cout << buf.str();
        buf.seekp(0);

        return "yangrui\n";
}

int main(int arg, char** args) {
    lb::thread_pool p;
#if 0
        {
                struct _B {
                        int test(const char* arg, int num) {
                                cout << arg << endl;
                                cout << num << endl;
                                return 242;
                        }

                };
                _B b;
                p.submit(&_B::test, b, "helooooo", 24242);
                return 0;
        }
#elif 0
        {

                p.submit([](int a, int b) {
                        cout << "a: " << a << endl;
                        cout << "b: " << b << endl;
                        cout << "td: " << this_thread::get_id() << endl;
                }, 2, 4);

                getchar();

                p.over();
                return 0;
        }
#elif 1
        { 
                auto f_a = p.submit(test_a, 222);
                auto f_b = p.submit(test_b, "hello");

                cout << f_a.get() << endl;
                cout << f_b.get() << endl;
        }

        for (int i = -1; ++i < 10;) {
                p.submit(test_a, i* 12);
                p.submit(test_b, "杨杨");
        }

        getchar();
        p.over();
#endif

        return 0;
}

编译

g++ -ggdb3 main.cpp join_threads.cpp thread_pool.cpp -lpthread -std=c++17
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,014评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,796评论 3 386
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,484评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,830评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,946评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,114评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,182评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,927评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,369评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,678评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,832评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,533评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,166评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,885评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,128评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,659评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,738评论 2 351

推荐阅读更多精彩内容