线程池

利用c11的条件变量,简单实现一个线程池

#pragma once
#include <functional>
#include <utility>
#include <vector>
#include <thread>
#include <queue>
#include <mutex>
#include <xtr1common>

#define  MAX_THREAD_NUMBERS  3
enum threadpriorityleavel
{
    level_0,
    level_1,
    level_2
};

typedef std::function<void()>  Task;
typedef std::pair<threadpriorityleavel, Task>  TaskPair;

class MyThreadPool
{
public:
    MyThreadPool();
    ~MyThreadPool();

    void Start();
    void Stop();
    void AddTask(const Task&);
    void AddTask(const TaskPair&);

    MyThreadPool(const MyThreadPool&) = delete;
    MyThreadPool& operator=(const MyThreadPool&) = delete;
private:
    class comparefunc {
    public:
        bool operator()(const TaskPair& task1, const TaskPair& task2) {
            return task1.first > task2.first;
        }
    };

    void threadloop();
    Task taketask();

    std::vector<std::thread*> vecThreads;
    std::priority_queue<TaskPair, std::vector<TaskPair>, comparefunc> tasks;

    std::mutex mt;
    std::condition_variable cv;
    bool is_start_ = false;
};

#include "threadpool.h"
#include <mutex>
#include <iostream>

MyThreadPool::MyThreadPool() {

}

void MyThreadPool::threadloop() {
    std::cout << "threadloop current thid=" << std::this_thread::get_id() << ", start" << std::endl;
    while (is_start_) {
        Task task = taketask();
        if (task) {
            task();
        }
    }
    std::cout << "threadloop current thid=" << std::this_thread::get_id() << ", end" << std::endl;
}

Task MyThreadPool::taketask() {
    std::unique_lock<std::mutex> lock(mt);
    while (tasks.empty() && is_start_) {
        std::cout << "taketask current thid=" << std::this_thread::get_id() << ", wait" << std::endl;
        cv.wait(lock);
    }
    std::cout << "taketask current thid=" << std::this_thread::get_id() << ", wake" << std::endl;
    Task tk;
    if (!tasks.empty() && is_start_) {
        tk = tasks.top().second;
        tasks.pop();
    }

    return tk;
}

MyThreadPool::~MyThreadPool() {
    if (is_start_) {
        Stop();
    }
}

void MyThreadPool::Start() {
    is_start_ = true;
    vecThreads.clear();
    for (int i = 0; i < MAX_THREAD_NUMBERS; i++) {
        vecThreads.push_back(new std::thread(std::bind(&MyThreadPool::threadloop, this)));
    }
}

void MyThreadPool::Stop() {
    std::cout << "MyThreadPool::Stop()" << std::endl;
    {   
        std::unique_lock<std::mutex> lock(mt);
        is_start_ = false;
        cv.notify_all();
    }
    std::cout << "MyThreadPool::Stop() notify_all " << std::endl;
    for (auto itr = vecThreads.begin(); itr != vecThreads.end(); itr++) {
        (*itr)->join();
        delete* itr;
    }

    vecThreads.clear();
}

void MyThreadPool::AddTask(const Task& task) {
    std::unique_lock<std::mutex> lock(mt);
    TaskPair tp(level_2, task);
    tasks.push(tp);
    cv.notify_all();
}

void MyThreadPool::AddTask(const TaskPair& tp) {
    std::unique_lock<std::mutex> lock(mt);
    tasks.push(tp);
    cv.notify_all();
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。