C++实现简单线程池

#include <atomic>
#include <iostream>
#include <unistd.h>
#include <queue>
using namespace std;
using callback = void(*)(void *);
class Task {
public:
    Task(callback func, void *arg): func(func), arg(arg){}
    void run() {func(arg);}
private:
    callback func;
    void *arg;
};

class ThreadPool {
public:
    ThreadPool(uint32_t min, uint32_t max);
    ~ThreadPool();
    void submitTask(Task *task);
    bool hasTask();
private:
    static void *manager(void *arg);
    static void *worker(void *arg);
    pthread_mutex_t queueMutex;
    pthread_cond_t notEmpty;
    queue<Task*> *taskQueue;
    pthread_t *threads;
    pthread_t manageThread;
    uint32_t minThreadNum;
    uint32_t maxThreadNum;
    bool isShutdown;
};

bool ThreadPool::hasTask()
{
    if (taskQueue->empty())
        return false;
    else
        return true;
}

void ThreadPool::submitTask(Task *task)
{
    pthread_mutex_lock(&queueMutex);
    taskQueue->push(task);
    pthread_mutex_unlock(&queueMutex);
    pthread_cond_broadcast(&notEmpty);
}

void *ThreadPool::worker(void *arg)
{
    ThreadPool *pool = (ThreadPool *)arg;
    Task *task;
    while(!pool->isShutdown) {
        pthread_mutex_lock(&pool->queueMutex);
        while(pool->taskQueue->empty()) {
            pthread_cond_wait(&pool->notEmpty, &pool->queueMutex);
        }
        task = pool->taskQueue->front();
        pool->taskQueue->pop();
        pthread_mutex_unlock(&pool->queueMutex);
        task->run();
        delete task;
    }
    pthread_exit(0);
}

void *ThreadPool::manager(void *arg)
{
    pthread_exit(0);
}

ThreadPool::ThreadPool(uint32_t min, uint32_t max)
{
    isShutdown = false;
    minThreadNum = min;
    maxThreadNum = max;
    pthread_mutex_init(&queueMutex, NULL);
    pthread_cond_init(&notEmpty, NULL);
    taskQueue = new queue<Task*>();
    threads = new pthread_t[maxThreadNum];
    pthread_t pthid;
    for (int i = 0; i < minThreadNum; i++) {
        pthread_create(&pthid, NULL, worker, this);
        threads[i] = pthid;
    }
    pthread_create(&manageThread, NULL, manager, NULL);
}

ThreadPool::~ThreadPool()
{
    isShutdown = true;
    pthread_cond_broadcast(&notEmpty);
    delete [] threads;
    delete taskQueue;
}

void taskFunc(void* arg)
{
    int num = *(int*)arg;
    printf("thread %ld is working, number = %d\n",
           pthread_self(), num);
    //sleep(1);
}

int main()
{
    // 创建线程池
    ThreadPool *pool = new ThreadPool(2, 4);
    Task *task;
    int *arg;
    for (int i = 0; i < 100; ++i)
    {
        arg = new int(i);
        task = new Task(taskFunc, arg);
        pool->submitTask(task);
    }
    while (pool->hasTask()) {
        sleep(1);
    }
    delete pool;
    return 0;
}
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容