手写线程池: thread pool with modern c++

C++线程池

进程的创建和销毁,代价是昂贵的,除去操作系统的实现及其本身的原理,跟线程相比它更显得重量级。
这几年互联网的迅速发展,让线程正面临着跟进程一样的“重量级”困扰。尤其是GO等语言推出协程(纤程)
后,线程更是不堪其重。那么有没有改进的方向呢?有,将线程池化——线程池。

由于C++版本推进的历程(C++98, C++03, C++11, C++14, C++17, C++20)以及其弱鸡般的ABI兼容性,导致很多框架用起来得自己造轮子。C++版本再吐槽一句好了, C++即使版本推进到0xff, 对很多人来说还是c with class, 包括我。
我们的目标是, 造一个很Java中的ThreadPool类似的线程池。目前的进度:

  • [x] 极其简易的线程池(header only)
  • [x] 支持设置线程池核心线程数
  • [ ] 支持设置线程池最大线程数
  • [ ] 支持设置最大缓存的任务数
  • [ ] 支持设置任务提交拒绝策略


线程池中的概念:

  • job: 需要在线程中执行的代码
    • 例如: void read(const string & path, const HANDLE handle);
    • 该函数从文本中读取内容然后交给窗口渲染到界面上
  • task: 将job封装成一个task, 由于job的函数签名各异,所以需要封装(Java的job是Runnable,接口签名一致)。
    • 例如: auto task = [=]()->void { return read(path, handle); }
    • 这样就将签名各异的job统一封装成了std::function<void()>类型的task
    • 通过std::packaged_task和std::future处理job被异步调用的返回值
  • queue: 缓存task的队列, 队列操作和线程池中的线程耦合度很高, 原因如下:
    • 队列中的任务少时, 池中的空闲线程如何做到真正的不占用cpu?
      • 目前此项目是通过std::condition_variable的条件判断让空闲线程阻塞从而让出cpu
      • Java中是通过实现BlockQueue实现的,也就是队列中没有任务时,线程从队列中get会阻塞, 从而让出cpu
      • 也可以通过信号量 互斥量实现
    • 队列read write操作时, 可根据现实情况实现读优先、写优先的锁来平衡队列task的生产和消费, 目前此项目不支持
    • 设置queue的最大缓存task数
    • 为什么采取队列, 是为了保证task被执行的优先级(队列可以保证先提交的task被先执行,但是不保证先提交的task被先执行完)
  • thread: 采用C++11 标准库中的std::thread
    • 根据std::thread::hardware_concurrency()获取cpu数量进行任务cpu友好性优化, 目前此项目不支持
    • 设置thread的cpu亲和性优化程序执行(Windows平台:通过SetThreadAffinityMask指定线程在cpu哪个核心上运行)

Code

Code On GitHub

#ifndef _THREAD_POOL_H
#define _THREAD_POOL_H

#include <memory>
#include <functional>
#include <future>
#include <condition_variable>
#include <queue>
#include <algorithm>
#include <vector>
#include <thread>
#include <typeinfo>
/**
 * 几个需要注意的点:
 * 1、tasks的读写锁需要优化成带优先级的锁, 可以肯定线程池的绝大部分使用场景commit task比run task更密集
 * 2、根据tasks以及cpu扩展线程数
 * 3、支持允许缓存的task数,如果超出此数将采取拒绝策略
 * 4、拒绝策略
*/
class ThreadPool{
public:

    ThreadPool(int core, int max = 0, int cache = 0): core(core),//由于max和cache暂时没用到,因此赋值0
                    max(max), cache(cache), quit(false), force(false){
        
    }  

    ~ThreadPool(){
        this->quit.store(true);
        this->enable.notify_all();
        std::for_each(this->pool.begin(), this->pool.end(), [](std::thread & t){
            if(t.joinable()){
                t.join();
            }
        });
    }
public:
    void start(){
        for(auto idx = 0; idx < core; ++idx){
            pool.push_back(std::thread([this](){
                // 第一次退出,判断是否要强制退出
                bool quit = this->force.load() ? this->quit.load() : false;
                for(; !quit;){
                    std::unique_lock<std::mutex> lock(this->oper_lock);
                    this->enable.wait(lock, [this](){
                        return this->quit.load() || !this->tasks.empty();
                    });
                    // 不是强制退出时可从这里退出
                    if(this->quit.load() && this->tasks.empty()){
                        return;
                    }
                    std::function<void()> task = std::move(this->tasks.front());
                    this->tasks.pop();

                    task();
                }
            }));
        }
    }

    void shutdown(bool force = false){
        this->quit.store(true);
        this->force.store(force);
    }

    //void commit(std::function<void (void * param)> task);
    template<class T, class... Args>
    auto commit(T && t, Args&&...args)->std::future<decltype(t(args...))>{
        using TYPE = decltype(t(args...));
        if(this->quit.load()){
            //dont know return what, so throw an exception
            throw std::runtime_error("thread pool is alreay shutdown.");
        }
        // 1、std::packaged_task<decltype(f(args...))() 类似std::function\
            但是会将其封装的可调用元素的结果封装在std::future中
        // 2、std::make_shared 创建std::packaged_task<decltype(f(args...))()\
            类型的智能指针
        // 3、std::bind(std::forward<T>(t), std::forward<Args>(args)...)当做\
            std::packaged_task的构造参数
        auto task = std::make_shared<std::packaged_task<TYPE()> >( 
            std::bind(std::forward<T>(t), std::forward<Args>(args)...)
        );
        std::future<TYPE> result = task->get_future();
        std::lock_guard<std::mutex> lock(this->oper_lock);
        //将packaged_task 包裹在一个签名为void()的lambda函数中调用,因为此lambda函数符合std::function<void()>\
            的签名,所以可以放到queue中
        this->tasks.emplace([task](){
            (*task)();  //调用packaged_task
        });
        this->enable.notify_one();  // 在线程池中唤醒一个休眠的线程
        return result;
    }
private:
    //void move();



private:
    std::vector<std::thread> pool;
    std::queue<std::function<void()> > tasks;
    int core;   //线程池核心线程数
    int max;    //线程池根据tasks量以及cpu数最大可扩展的量
    int cache;  //运行tasks可缓存的最大task数,超出次数后commit将采取拒绝策略

    std::atomic<bool> quit;     //线程池shutdown条件, true时shutdown
    std::atomic<bool> force;    //是否强制shutdown,true时有剩余的task将不执行直接退出, false时等待执行完所有的task再退出
    std::condition_variable enable;     //
    std::mutex oper_lock;   // queue的读写锁
};
#endif

Test Code

#include <iostream>
#include <algorithm>
#include <random>
#include <chrono>

#include "./pool/ThreadPool.hpp"

int main(int argc, char** argv){
    ThreadPool pool(4);
    pool.start();
    std::default_random_engine rd;
    std::uniform_int_distribution<int> rang(100, 1000);
    for(int idx = 0; idx < 20; ++idx){
        pool.commit([=](int x, int y, int t){
            std::cout << "thread id : " << std::this_thread::get_id() 
                << " x = " << x << " y = " << y <<
                " sleep time = " << t << " ms" <<
                " id = " << idx << std::endl;
                
            std::this_thread::sleep_for(std::chrono::milliseconds(t));
        }, rang(rd), rang(rd), rang(rd));
    }
    std::vector<std::future<int> > results;
    for (auto index = 20; index < 50; ++index){
        results.push_back(
            pool.commit([=]()->int{
                return index;
            })
        );
    }
    for ( auto & r : results){
        std::cout << "get result from thread "
        << " index = " << r.get() << std::endl;
    }
    char command = std::cin.get();
    if (command == 'q'){
        pool.shutdown(true);
    }else if (command == 'e'){
        pool.shutdown(true);
        try
        {
            pool.commit([](){
                std::cout << "i want to get an exception" << std::endl;
            });
        }
        catch(const std::exception& e)
        {
            std::cerr << e.what() << '\n';
        } 
    }
    std::cout << "test finish, OY!" << std::endl;
    return 0;
}

Compile & Link

g++ -g -O3 -Wall -std=c++11 main.cpp -o ./out/test

Run

thread id : 0x70000a352000 x = 242 y = 937 sleep time = 480 ms id = 0
thread id : 0x70000a352000 x = 340 y = 390 sleep time = 692 ms id = 1
thread id : 0x70000a352000 x = 188 y = 294 sleep time = 738 ms id = 2
thread id : 0x70000a352000 x = 390 y = 978 sleep time = 270 ms id = 3
thread id : 0x70000a4db000 x = 432 y = 780 sleep time = 102 ms id = 4
thread id : 0x70000a458000 x = 652 y = 661 sleep time = 498 ms id = 5
thread id : 0x70000a3d5000 x = 839 y = 452 sleep time = 487 ms id = 6
thread id : 0x70000a352000 x = 698 y = 540 sleep time = 183 ms id = 7
thread id : 0x70000a4db000 x = 157 y = 983 sleep time = 638 ms id = 8
thread id : 0x70000a3d5000 x = 232 y = 823 sleep time = 766 ms id = 9
thread id : 0x70000a458000 x = 801 y = 411 sleep time = 314 ms id = 10
thread id : 0x70000a458000 x = 359 y = 912 sleep time = 294 ms id = 11
thread id : 0x70000a458000 x = 260 y = 142 sleep time = 372 ms id = 12
thread id : 0x70000a458000 x = 618 y = 499 sleep time = 831 ms id = 13
thread id : 0x70000a458000 x = 108 y = 319 sleep time = 376 ms id = 14
thread id : 0x70000a3d5000 x = 870 y = 490 sleep time = 519 ms id = 15
thread id : 0x70000a352000 x = 446 y = 998 sleep time = 496 ms id = 16
thread id : 0x70000a3d5000 x = 321 y = 308 sleep time = 610 ms id = 17
thread id : 0x70000a3d5000 x = 247 y = 256 sleep time = 629 ms id = 18
thread id : 0x70000a3d5000 x = 186 y = 484 sleep time = 703 ms id = 19
get result from thread  index = 20
get result from thread  index = 21
get result from thread  index = 22
get result from thread  index = 23
get result from thread  index = 24
get result from thread  index = 25
get result from thread  index = 26
get result from thread  index = 27
get result from thread  index = 28
get result from thread  index = 29
get result from thread  index = 30
get result from thread  index = 31
get result from thread  index = 32
get result from thread  index = 33
get result from thread  index = 34
get result from thread  index = 35
get result from thread  index = 36
get result from thread  index = 37
get result from thread  index = 38
get result from thread  index = 39
get result from thread  index = 40
get result from thread  index = 41
get result from thread  index = 42
get result from thread  index = 43
get result from thread  index = 44
get result from thread  index = 45
get result from thread  index = 46
get result from thread  index = 47
get result from thread  index = 48
get result from thread  index = 49
e
thread pool is alreay shutdown.
test finish, OY!
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,843评论 6 502
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,538评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,187评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,264评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,289评论 6 390
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,231评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,116评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,945评论 0 275
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,367评论 1 313
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,581评论 2 333
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,754评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,458评论 5 344
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,068评论 3 327
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,692评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,842评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,797评论 2 369
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,654评论 2 354

推荐阅读更多精彩内容