Linux系统编程12:线程池编程

1. 概念

安检
银行柜台
  • 为什么使用线程池?
    频繁创建和销毁线程浪费CPU资源
  • 线程是什么?
    一堆线程放在一个池子里统一管理

2. 构成

线程池

2.1 任务队列job_queue

  • 作用
    存放待处理的任务
  • 成员
No. 构成 接口
1 处理函数 void *(*)(void*)
2 参数 void *arg
3 队列指针 struct job_queue* pnext

2.2 工作线程worker

  • 作用
    处理任务

2.3 线程池thread_pool

  • 作用
    管理多个线程并提供任务队列的接口

  • 成员

No. 构成 接口
1 初始化 threadpool_init()
2 销毁 threadpool_destroy()
3 添加任务 threadpool_add_job()

3. 流程

使用流程

  1. 初始化线程池
  2. 向线程池添加任务
  3. 销毁线程池

线程池初始化

  1. 初始化任务队列和工作线程组
  2. 将等候在条件变量(任务队列上有任务)上的一个线程唤醒并从该任务队列中取出第一个任务给该线程执行
  3. 等待任务队列中所有任务执行完毕

4. 实例

  • Linux C语言实现
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <assert.h>
 
struct job_queue{
    void* (*func)(void* arg);
    void* arg;
    struct job_queue* pnext;
};
 
struct threadpool{
    struct job_queue* phead;
    struct job_queue* ptail;
    pthread_t* pworks; 
    size_t nthread;
    pthread_mutex_t mutex;
    pthread_cond_t cond;
    int destroy;
};
 
void* threadpool_routine(void* arg){
    assert(NULL != arg);
    struct threadpool* pthpool = (struct threadpool*)arg;
    for(;;){
        struct job_queue* pjob = NULL;
        pthread_mutex_lock(&(pthpool->mutex));
        while(pthpool->phead == NULL && 0 == pthpool->destroy){
            pthread_cond_wait(&(pthpool->cond),&(pthpool->mutex));
        }

        // 
        if(1 == pthpool->destroy){
            printf("%lu exit\n",pthread_self());
            pthread_mutex_unlock(&(pthpool->mutex));
            pthread_exit(NULL);
        }
        
        pjob = pthpool->phead;
        pthpool->phead = pjob->pnext;
        pthread_mutex_unlock(&(pthpool->mutex));
        pjob->func(pjob->arg);
        free(pjob);
    }
}
 
struct threadpool* threadpool_init(size_t nthread){
    struct threadpool* pthpool = malloc(sizeof(struct threadpool));
 
    pthread_mutex_init(&(pthpool->mutex),NULL);
    pthread_cond_init(&(pthpool->cond),NULL);
     
    pthpool->phead = NULL;
    pthpool->ptail = NULL;
    pthpool->nthread = nthread;
    pthpool->destroy = 0;
 
    // 初始化工作线程组
    pthpool->pworks = malloc(nthread * sizeof(pthread_t));
    int i;
    for(i=0;i<nthread;i++){
        pthread_create(&(pthpool->pworks[i]),NULL,threadpool_routine,pthpool);
    }
 
    return pthpool;
}
void threadpool_destroy(struct threadpool* pthpool){
    assert(NULL != pthpool);
 
 
    while(NULL != pthpool->phead){
        struct job_queue* pnext = pthpool->phead->pnext;
        free(pthpool->phead);
        pthpool->phead = pnext;
    }
 
    pthpool->destroy = 1;
    pthread_cond_broadcast(&(pthpool->cond));
    int i;
    for(i=0;i<pthpool->nthread;i++){
        pthread_join(pthpool->pworks[i],NULL);
    }
     
    free(pthpool->pworks);
    free(pthpool);
 
    pthread_mutex_destroy(&(pthpool->mutex));
    pthread_cond_destroy(&(pthpool->cond));
    pthpool = NULL;
}
void threadpool_add_job(struct threadpool* pthpool,void*(*func)(void*),void* arg){
    assert(NULL != pthpool);
    struct job_queue* jq = malloc(sizeof(struct job_queue));
    jq->func = func;
    jq->arg = arg;
    jq->pnext = NULL;
 
    printf("add job %d\n",(int) arg);
    pthread_mutex_lock(&(pthpool->mutex));
    if(pthpool->phead == NULL){
        pthpool->phead = jq;
        pthpool->ptail = jq;
    }else{
        pthpool->ptail->pnext = jq;
        pthpool->ptail = jq;
    }
    pthread_mutex_unlock(&(pthpool->mutex));
    pthread_cond_signal(&(pthpool->cond));
}
     
void* test(void* arg){
    printf("%lu do job %d\n",pthread_self(),(int)arg);
    sleep(1);
}
int main(int argc,char* argv[]){
    if(3 != argc){
        printf("usage:%s\n <#nthread> <#njob>", argv[0]);
        return 1;
    }

    int nthread = atoi(argv[1]);
    int njob = atoi(argv[2]);

    // 1. 初始化线程池
    struct threadpool* pool = threadpool_init(nthread);

    // 2. 向线程池添加任务
    int i;
    for(i=0;i<njob;i++){
        threadpool_add_job(pool,test,(void*)i);
    }
    pause();

    // 3. 销毁线程池
    threadpool_destroy(pool);
}

注意:任务队列初始化在工作线程组前。

  • Linux C++98实现
#include <pthread.h>
#include <vector>
#include <queue>
#include <unistd.h>
#include <cstdio>
#include <cstdlib>

using namespace std;

class ThreadPool{
    typedef void (*func_t)(int);
public:
    ThreadPool(size_t count):destroy(false){

        // 初始化互斥锁和条件变量
        pthread_mutex_init(&mutex,NULL);
        pthread_cond_init(&cond,NULL);

        // 初始化线程组
        for(int i=0;i<count;i++){
            pthread_t tid;
            pthread_create(&tid,NULL,reinterpret_cast<void*(*)(void*)>(&ThreadPool::Route),this);
            threads.push_back(tid);
        }        
    }
    ~ThreadPool(){
        // 通知线程退出
        destroy = true;
        pthread_cond_broadcast(&cond);
        for(vector<pthread_t>::iterator it = threads.begin();it != threads.end();it++){
            pthread_join(*it,NULL);
        }

        // 销毁互斥锁和条件变量
        pthread_mutex_destroy(&mutex);
        pthread_cond_destroy(&cond);
    }
    void AddJob(func_t func,int arg){
        pthread_mutex_lock(&mutex);
        tasks.push(func);
        args.push(arg);
        pthread_cond_signal(&cond);
        pthread_mutex_unlock(&mutex);
    }
private:
    static void Route(ThreadPool* pPool){
        for(;;){
            pthread_mutex_lock(&(pPool->mutex));
            // 如果没有任务等待
            while(pPool->tasks.empty() && !pPool->destroy){
                pthread_cond_wait(&(pPool->cond),&(pPool->mutex));
            }

            // 线程退出
            if(pPool->destroy){
                pthread_mutex_unlock(&(pPool->mutex));
                break;
            }

            // 获取任务
            func_t task = pPool->tasks.front();
            pPool->tasks.pop();
            int arg = pPool->args.front();
            pPool->args.pop();
            pthread_mutex_unlock(&(pPool->mutex));

            // 执行任务
            task(arg);
        }
    }
private:
    vector<pthread_t> threads;  ///< 线程组
    queue<func_t> tasks;        ///< 任务队列
    queue<int> args;            ///< 参数队列
    pthread_mutex_t mutex;
    pthread_cond_t cond;
    bool destroy;           ///< 线程池销毁标志
};

void test(int arg){
    printf("%lu do job %d\n",pthread_self(),arg);
    sleep(1);
}

int main(int argc,char** argv){
    if(3 != argc){
        printf("usage:%s\n <#nthread> <#njob>", argv[0]);
        return 1;
    }

    // 1. 初始化线程池
    int nthread = atoi(argv[1]);
    int njob = atoi(argv[2]);

    ThreadPool pool(nthread);

    // 2. 向线程池添加任务
    for(int i=0;i<njob;i++){
        pool.AddJob(test,i);
    }
    pause();
    return 0;
}
  • C++11实现
#include <thread>
#include <vector>
#include <queue>
#include <cstdio>
#include <cstdlib>
#include <condition_variable>
#include <mutex>
#include <functional>
#include <chrono>
#include <iostream>

using namespace std;

class ThreadPool{
    typedef function<void(int)> func_t;
public:
    ThreadPool(size_t count):destroy(false){
        // 初始化线程组
        for(int i=0;i<count;i++){
            threads.emplace_back(&ThreadPool::Route,this);
        }        
    }
    ~ThreadPool(){
        // 通知线程退出
        destroy = true;
        cond.notify_all();
        for(auto& th:threads){
            th.join();
        }
    }
    void AddJob(func_t func,int arg){
        mtx.lock();
        tasks.push(func);
        args.push(arg);
        cond.notify_one();
        mtx.unlock();
    }
private:
    static void Route(ThreadPool* pPool){
        for(;;){
            unique_lock<mutex> lck(pPool->mtx);
            // 如果没有任务等待
            while(pPool->tasks.empty() && !pPool->destroy){
                pPool->cond.wait(lck);
            }

            // 线程退出
            if(pPool->destroy){
                break;
            }

            // 获取任务
            func_t task = pPool->tasks.front();
            pPool->tasks.pop();
            int arg = pPool->args.front();
            pPool->args.pop();
            lck.unlock();
            
            // 执行任务
            task(arg);
        }
    }
private:
    vector<thread> threads;  ///< 线程组
    queue<func_t> tasks;     ///< 任务队列
    queue<int> args;         ///< 参数队列
    mutex mtx;
    condition_variable cond;
    bool destroy;            ///< 线程池销毁标志
};

void test(int arg){
    cout << this_thread::get_id() << " do job " << arg << endl;
    this_thread::sleep_for(chrono::seconds(1));
}

int main(int argc,char** argv){
    if(3 != argc){
        printf("usage:%s\n <#nthread> <#njob>", argv[0]);
        return 1;
    }

    // 1. 初始化线程池
    int nthread = atoi(argv[1]);
    int njob = atoi(argv[2]);

    ThreadPool pool(nthread);

    // 2. 向线程池添加任务
    for(int i=0;i<njob;i++){
        pool.AddJob(test,i);
    }
    
    this_thread::sleep_for(chrono::seconds(10));

    return 0;
}

C++11 lambda表达式实现

#include <iostream>
#include <thread>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <vector>
#include <functional>
using namespace std;
 
class ThreadPool{
    vector<thread> threads;
    queue<function<void()>> tasks;
    condition_variable cond;
    mutex m;
    bool exited = false;
public:
    ThreadPool(int num){
    auto routine = [this](){
        while(true){
            unique_lock<mutex> lock(m);
            while(tasks.empty() && !exited) cond.wait(lock);
            if(exited) break;
            auto func = tasks.front();
            tasks.pop();
            lock.unlock();
            func();
       }            
    };
        for(int i=0;i<num;++i){
        // thread t(routine);
        // threads.emplace_back(move(t));
        // threads.emplace_back(thread(routine));
        threads.emplace_back(thread{routine});
    }
    }
    ~ThreadPool(){
        exited = true;
    cond.notify_all();
    for(auto& thread:threads){
        thread.join();
    }
    }
    ThreadPool(const ThreadPool&) = default;
    ThreadPool& operator=(const ThreadPool&) = default;
 
    void AddTask(function<void()> func){
     lock_guard<mutex> guard(m);
     tasks.push(func);
     cond.notify_one();
         // signal
    }
 
};
 
int main(){
    mutex m;
    auto test = [&m](){
    this_thread::sleep_for(2s);
    lock_guard<mutex> guard(m);
        cout << this_thread::get_id() << " do something..." << endl;
    };
 
    ThreadPool pool(5);
 
    for(int i=0;i<15;++i){
        pool.AddTask(test);
    }
    this_thread::sleep_for(10s);
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,185评论 6 503
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,652评论 3 393
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,524评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,339评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,387评论 6 391
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,287评论 1 301
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,130评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,985评论 0 275
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,420评论 1 313
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,617评论 3 334
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,779评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,477评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,088评论 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,716评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,857评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,876评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,700评论 2 354

推荐阅读更多精彩内容