使用C++11实现一个半同步半异步的线程池

技术原理
1、SyncQueue端是同步的,同步丢任务到同步队列
2、ThreadPool端是异步的,使用若干线程并发的从线程池里面取任务,执行任务
3、SyncQueue端使用条件变量和互斥体支持同步,避免同时读写list
4、ThreadPool端使用atomic_bool支持多线程下的bool变量的原子性,类似跨线程的唯一标识,标识线程是否处于运行状态
5、使用RAII技术,在析构函数中释放线程资源。为避免重复释放,使用std::call_once语义保证只执行一遍

程序代码如下
CMakeLists.txt, 准确的说这个CMakeLists中的Boost库可以去掉,因为用的都是std

cmake_minimum_required(VERSION 2.6)
project(main)

add_definitions(-std=c++11)

aux_source_directory(. CPP_LIST)

find_package(Boost REQUIRED COMPONENTS 
    system
    filesystem
    )

include_directories(${Boost_INCLUDE_DIRS})


add_executable(main ${CPP_LIST})

target_link_libraries(main ${Boost_LIBRARIES})
target_link_libraries(main pthread)

sync_queue.hpp

#ifndef _SYNC_QUEUE_HPP_
#define _SYNC_QUEUE_HPP_

#include <list>
#include <mutex>
#include <thread>
#include <condition_variable>
#include <iostream>

using namespace std;

template <typename T> 
class SyncQueue {
    public:
        
        SyncQueue(int maxSize): m_maxSize(maxSize), m_needStop(false) {}

        void Put(const T& x) {
            Add(x);
        }

        void Put(T&& x) {
            Add(std::forward<T>(x));
        }
        
        void Take(std::list<T>& list) {
            std::unique_lock<std::mutex> locker(m_mutex);
            m_NotEmpty.wait(locker, [this] {
                        return m_needStop || NotEmpty();
                    });

            if(m_needStop) {
                return;
            }

            list = std::move(m_queue);
            m_NotFull.notify_one();
        }


        void Take(T& t) {
            std::unique_lock<std::mutex> locker(m_mutex);
            m_NotEmpty.wait(locker, [this] {
                        return m_needStop || NotEmpty();
                    });
            
            if(m_needStop) {
                return;
            }

            t = m_queue.front();
            m_queue.pop_front();
            m_NotFull.notify_one();
        }

        void Stop() {
            {
                std::lock_guard<std::mutex> locker(m_mutex);
                m_needStop = true;
            }

            m_NotFull.notify_all();
            m_NotEmpty.notify_all();
        }
        
        bool Empty() {
            std::lock_guard<std::mutex> locker;
            return m_queue.empty();
        }

        bool Full() {
            std::lock_guard<std::mutex> locker;
            return m_queue.size() == m_maxSize;
        }
    
    private:
        
        // 判断队列未满,内部使用的无锁版,否则会发生死锁
        bool NotFull() const {
            bool full = m_queue.size() >= m_maxSize;
            if(full) {
                std::cerr << "full, waiting, thread_id: " << std::this_thread::get_id() << std::endl;
            }
            return !full;
        }

        bool NotEmpty() const {
            bool empty = m_queue.empty();
            if(empty) {
                std::cerr << "empty, waiting, thread_id: " << std::this_thread::get_id() << std::endl;
            }
            return !empty;
        }

        template <typename F> 
        void Add(F&& x) {
            std::unique_lock<std::mutex> locker(m_mutex);
            m_NotFull.wait(locker, [this]{ return m_needStop || NotFull(); });
            
            if(m_needStop) {
                return;
            }

            m_queue.push_back(std::forward<F>(x));
            m_NotEmpty.notify_one();
        }

    private:
        std::list<T> m_queue; // 缓冲区
        std::mutex m_mutex; // 互斥量和条件变量结合起来使用
        std::condition_variable m_NotEmpty; // 不为空的条件变量
        std::condition_variable m_NotFull; // 不为满的条件变量
        int m_maxSize; // 同步队列的最大大小
        bool m_needStop; // 停止标志
};
#endif

thread_pool.hpp

#ifndef _THREAD_POOL_HPP_
#define _THREAD_POOL_HPP_
#include "sync_queue.hpp"

#include <list>
#include <thread>
#include <functional>
#include <memory>
#include <atomic>

// 定义同步队列中最大的任务数
const int MaxTaskCount = 100;

class ThreadPool {
    public:
        using Task = std::function<void()>;

        ThreadPool(int numThreads=std::thread::hardware_concurrency()): m_queue(MaxTaskCount) {
            Start(numThreads);
        }

        ~ThreadPool() {
            Stop();
        }
        
        // 析构函数中停止所有线程组中线程的函数
        void Stop() {
            // 保证多线程环境下只调用一次StopThreadGroup
            std::call_once(m_flag, [this]{ StopThreadGroup();  });
        }

        void AddTask(Task&& task) {
            m_queue.Put(std::forward<Task>(task));
        }

        void AddTask(const Task& task) {
            m_queue.Put(task);
        }



    private:
        
        void Start(int numThreads) {
            m_running = true;

            for(int i=0; i<numThreads; ++i) {
                m_threadgroup.push_back(std::make_shared<std::thread>(&ThreadPool::RunInThread, this));
            }
        }
        
        // 真正执行Task的函数
        void RunInThread() {
            while(m_running) {
                // 取任务并执行
                std::list<Task> list;
                m_queue.Take(list);

                for(auto& task: list) {
                    if(!m_running) {
                        return;
                    }

                    task();
                }
            }
        }

        void StopThreadGroup() {
            m_queue.Stop(); // 让同步队列中的线程停止
            m_running = false;   // 置为false,让内部线程跳出循环并退出

            for(auto& thread: m_threadgroup) {
                if(thread) {
                    thread->join(); 
                }
            }

            m_threadgroup.clear();
        }


        std::list<std::shared_ptr<std::thread> > m_threadgroup; // 处理任务的线程组
        SyncQueue<Task> m_queue;   // 同步队列
        std::atomic_bool m_running; // 是否停止的标志
        std::once_flag m_flag;     
};

#endif

main.cpp

#include "thread_pool.hpp"

#include <iostream>
#include <fstream>
#include <chrono>
#include <string>


int main() {

    ThreadPool pool;

    for(int i=0; i<20; ++i) {
        pool.AddTask(
                    [i] ()  {
                        std::string fname = std::to_string(i);
                        fname += ".txt";

                        std::ofstream ofs(fname, std::ios::out | std::ios::app);

                        ofs << std::this_thread::get_id() << ": ["<< i << "]" << std::endl;

                        ofs.close();
                    }
                );
    }
    
    // 真正的服务器程序是不会退出的,这里不用sleep  
    std::this_thread::sleep_for(std::chrono::seconds(2));
}

程序输出如下


图片.png
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • 0.为什么需要线程池? 当我们需要完成一些持续时间短、发生频率高的工作时,每次为他们开启一个线程既显得繁琐又会造成...
    琼蘂无徵朝霞难挹阅读 1,271评论 0 3
  • github地址:https://github.com/progschj/ThreadPool 行38 vecto...
    姬权阅读 225评论 0 0
  • 什么是线程池 线程池(thread pool)是一种线程使用模式。线程过多或者频繁创建和销毁线程会带来调度开销,进...
    快乐小吧阅读 3,953评论 0 1
  • 本文根据众多互联网博客内容整理后形成,引用内容的版权归原始作者所有,仅限于学习研究使用,不得用于任何商业用途。 互...
    深红的眼眸阅读 1,138评论 0 0
  • 推荐指数: 6.0 书籍主旨关键词:特权、焦点、注意力、语言联想、情景联想 观点: 1.统计学现在叫数据分析,社会...
    Jenaral阅读 5,753评论 0 5