使用C++11实现一个半同步半异步的线程池

技术原理
1、SyncQueue端是同步的,同步丢任务到同步队列
2、ThreadPool端是异步的,使用若干线程并发的从线程池里面取任务,执行任务
3、SyncQueue端使用条件变量和互斥体支持同步,避免同时读写list
4、ThreadPool端使用atomic_bool支持多线程下的bool变量的原子性,类似跨线程的唯一标识,标识线程是否处于运行状态
5、使用RAII技术,在析构函数中释放线程资源。为避免重复释放,使用std::call_once语义保证只执行一遍

程序代码如下
CMakeLists.txt, 准确的说这个CMakeLists中的Boost库可以去掉,因为用的都是std

cmake_minimum_required(VERSION 2.6)
project(main)

add_definitions(-std=c++11)

aux_source_directory(. CPP_LIST)

find_package(Boost REQUIRED COMPONENTS 
    system
    filesystem
    )

include_directories(${Boost_INCLUDE_DIRS})


add_executable(main ${CPP_LIST})

target_link_libraries(main ${Boost_LIBRARIES})
target_link_libraries(main pthread)

sync_queue.hpp

#ifndef _SYNC_QUEUE_HPP_
#define _SYNC_QUEUE_HPP_

#include <list>
#include <mutex>
#include <thread>
#include <condition_variable>
#include <iostream>

using namespace std;

template <typename T> 
class SyncQueue {
    public:
        
        SyncQueue(int maxSize): m_maxSize(maxSize), m_needStop(false) {}

        void Put(const T& x) {
            Add(x);
        }

        void Put(T&& x) {
            Add(std::forward<T>(x));
        }
        
        void Take(std::list<T>& list) {
            std::unique_lock<std::mutex> locker(m_mutex);
            m_NotEmpty.wait(locker, [this] {
                        return m_needStop || NotEmpty();
                    });

            if(m_needStop) {
                return;
            }

            list = std::move(m_queue);
            m_NotFull.notify_one();
        }


        void Take(T& t) {
            std::unique_lock<std::mutex> locker(m_mutex);
            m_NotEmpty.wait(locker, [this] {
                        return m_needStop || NotEmpty();
                    });
            
            if(m_needStop) {
                return;
            }

            t = m_queue.front();
            m_queue.pop_front();
            m_NotFull.notify_one();
        }

        void Stop() {
            {
                std::lock_guard<std::mutex> locker(m_mutex);
                m_needStop = true;
            }

            m_NotFull.notify_all();
            m_NotEmpty.notify_all();
        }
        
        bool Empty() {
            std::lock_guard<std::mutex> locker;
            return m_queue.empty();
        }

        bool Full() {
            std::lock_guard<std::mutex> locker;
            return m_queue.size() == m_maxSize;
        }
    
    private:
        
        // 判断队列未满,内部使用的无锁版,否则会发生死锁
        bool NotFull() const {
            bool full = m_queue.size() >= m_maxSize;
            if(full) {
                std::cerr << "full, waiting, thread_id: " << std::this_thread::get_id() << std::endl;
            }
            return !full;
        }

        bool NotEmpty() const {
            bool empty = m_queue.empty();
            if(empty) {
                std::cerr << "empty, waiting, thread_id: " << std::this_thread::get_id() << std::endl;
            }
            return !empty;
        }

        template <typename F> 
        void Add(F&& x) {
            std::unique_lock<std::mutex> locker(m_mutex);
            m_NotFull.wait(locker, [this]{ return m_needStop || NotFull(); });
            
            if(m_needStop) {
                return;
            }

            m_queue.push_back(std::forward<F>(x));
            m_NotEmpty.notify_one();
        }

    private:
        std::list<T> m_queue; // 缓冲区
        std::mutex m_mutex; // 互斥量和条件变量结合起来使用
        std::condition_variable m_NotEmpty; // 不为空的条件变量
        std::condition_variable m_NotFull; // 不为满的条件变量
        int m_maxSize; // 同步队列的最大大小
        bool m_needStop; // 停止标志
};
#endif

thread_pool.hpp

#ifndef _THREAD_POOL_HPP_
#define _THREAD_POOL_HPP_
#include "sync_queue.hpp"

#include <list>
#include <thread>
#include <functional>
#include <memory>
#include <atomic>

// 定义同步队列中最大的任务数
const int MaxTaskCount = 100;

class ThreadPool {
    public:
        using Task = std::function<void()>;

        ThreadPool(int numThreads=std::thread::hardware_concurrency()): m_queue(MaxTaskCount) {
            Start(numThreads);
        }

        ~ThreadPool() {
            Stop();
        }
        
        // 析构函数中停止所有线程组中线程的函数
        void Stop() {
            // 保证多线程环境下只调用一次StopThreadGroup
            std::call_once(m_flag, [this]{ StopThreadGroup();  });
        }

        void AddTask(Task&& task) {
            m_queue.Put(std::forward<Task>(task));
        }

        void AddTask(const Task& task) {
            m_queue.Put(task);
        }



    private:
        
        void Start(int numThreads) {
            m_running = true;

            for(int i=0; i<numThreads; ++i) {
                m_threadgroup.push_back(std::make_shared<std::thread>(&ThreadPool::RunInThread, this));
            }
        }
        
        // 真正执行Task的函数
        void RunInThread() {
            while(m_running) {
                // 取任务并执行
                std::list<Task> list;
                m_queue.Take(list);

                for(auto& task: list) {
                    if(!m_running) {
                        return;
                    }

                    task();
                }
            }
        }

        void StopThreadGroup() {
            m_queue.Stop(); // 让同步队列中的线程停止
            m_running = false;   // 置为false,让内部线程跳出循环并退出

            for(auto& thread: m_threadgroup) {
                if(thread) {
                    thread->join(); 
                }
            }

            m_threadgroup.clear();
        }


        std::list<std::shared_ptr<std::thread> > m_threadgroup; // 处理任务的线程组
        SyncQueue<Task> m_queue;   // 同步队列
        std::atomic_bool m_running; // 是否停止的标志
        std::once_flag m_flag;     
};

#endif

main.cpp

#include "thread_pool.hpp"

#include <iostream>
#include <fstream>
#include <chrono>
#include <string>


int main() {

    ThreadPool pool;

    for(int i=0; i<20; ++i) {
        pool.AddTask(
                    [i] ()  {
                        std::string fname = std::to_string(i);
                        fname += ".txt";

                        std::ofstream ofs(fname, std::ios::out | std::ios::app);

                        ofs << std::this_thread::get_id() << ": ["<< i << "]" << std::endl;

                        ofs.close();
                    }
                );
    }
    
    // 真正的服务器程序是不会退出的,这里不用sleep  
    std::this_thread::sleep_for(std::chrono::seconds(2));
}

程序输出如下


图片.png
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,843评论 6 502
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,538评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,187评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,264评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,289评论 6 390
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,231评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,116评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,945评论 0 275
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,367评论 1 313
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,581评论 2 333
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,754评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,458评论 5 344
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,068评论 3 327
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,692评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,842评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,797评论 2 369
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,654评论 2 354

推荐阅读更多精彩内容

  • 0.为什么需要线程池? 当我们需要完成一些持续时间短、发生频率高的工作时,每次为他们开启一个线程既显得繁琐又会造成...
    琼蘂无徵朝霞难挹阅读 1,261评论 0 3
  • github地址:https://github.com/progschj/ThreadPool 行38 vecto...
    姬权阅读 218评论 0 0
  • 什么是线程池 线程池(thread pool)是一种线程使用模式。线程过多或者频繁创建和销毁线程会带来调度开销,进...
    快乐小吧阅读 3,925评论 0 1
  • 本文根据众多互联网博客内容整理后形成,引用内容的版权归原始作者所有,仅限于学习研究使用,不得用于任何商业用途。 互...
    深红的眼眸阅读 1,100评论 0 0
  • 推荐指数: 6.0 书籍主旨关键词:特权、焦点、注意力、语言联想、情景联想 观点: 1.统计学现在叫数据分析,社会...
    Jenaral阅读 5,719评论 0 5