线程池

比起普通的多线程服务器,线程池的效率更高,因为线程的创建和销毁是非常耗性能的.所以线程池就是创建若干个工作线程和一个管理者线程,在任务量大的时候,管理者线程会新建工作线程,在任务量小的时候会销毁空闲的工作线程,而不是每次线程执行完后就立马销毁.

threadpool.c

#include <stdlib.h>

#include <pthread.h>

#include <unistd.h>

#include <assert.h>

#include <stdio.h>

#include <string.h>

#include <errno.h>

#include <signal.h>

#include <threadpool.h>

#define DEFAULT_TIME 10

//任务队列中等待的任务大于该值时,添加新的线程到线程池

#define MIN_WAIT_TASK_NUM 10

//每次创建和销毁的线程数量

#define DEFAULT_THREAD_VARY 10

#define true 1

#define false 0

//任务结构体

typedef struct {

    void* (*function)(void*);

    void* arg;

}threadpool_task_t;

//线程池结构体

struct threadpool_t{

    pthread_mutex_t lock;

    pthread_mutex_t busy_lock;

    pthread_cond_t queue_not_full;

    pthread_cond_t queue_not_empty;

    pthread_t *tids;

    pthread_t admin_id;

    threadpool_task_t *task_queue; //任务队列

    int min_thr_num;

    int max_thr_num;

    int live_thr_num; //当前存活线程个数

    int busy_thr_num; //当前繁忙线程个数

    int destroy_thr_num; //要销毁的线程个数

    int queue_front;

    int queue_rear;

    int queue_size;

    int queue_max_size;

    int shutdown;

};

void* worker_fun(void* threadpool);

void* admin_fun(void* threadpool);

int is_thread_alive(pthread_t tid);

int threadpool_free(threadpool_t* pool);

threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size){

    int i;

    printf("threadpool_create start==\n");

    threadpool_t *pool = NULL;

    do{

        if((pool = (threadpool_t*)malloc(sizeof(threadpool_t))) == NULL){

        printf("malloc threadpool failed\n");

        break;

    }

    pool->min_thr_num = min_thr_num;

    pool->max_thr_num = max_thr_num;

    pool->busy_thr_num = 0;

    pool->live_thr_num = min_thr_num;

    pool->queue_size = 0;

    pool->queue_max_size = queue_max_size;

    pool->queue_front = 0;

    pool->queue_rear = 0;

    pool->shutdown = false;

    //根据最大线程个数开辟线程空间

    pool->tids = (pthread_t*)malloc(sizeof(pthread_t)*max_thr_num);

    if(pool->tids == NULL){

    printf("malloc threads error\n");

    break;

}

memset(pool->tids, 0, sizeof(pthread_t) * max_thr_num);

//任务队列开辟空间

pool->task_queue = (threadpool_task_t*)malloc(sizeof(threadpool_t)*queue_max_size);

if(pool->task_queue == NULL){

    printf("malloc task_queue error\n");

    break;

}

//初始化互斥锁,条件变量

if(pthread_mutex_init(&(pool->lock),NULL) != 0

|| pthread_mutex_init(&(pool->busy_lock), NULL) != 0

|| pthread_cond_init(&(pool->queue_not_empty), NULL) != 0

|| pthread_cond_init(&(pool->queue_not_full), NULL) != 0)

{

    printf("init lock or cond failed\n");

    break;

}

//创建若干工作线程 和 一个管理线程

for(i = 0; i < min_thr_num; i++){

    pthread_create(&(pool->tids[i]), NULL, worker_fun, (void*)pool);

    printf("thread[%d] %d created\n", i, (unsigned int)pool->tids[i]);

}

pthread_create(&(pool->admin_id), NULL, admin_fun, (void*)pool);

return pool;

}while(0);

threadpool_free(pool);

return NULL;

}

//向线程池中添加一个任务

int threadpool_add(threadpool_t *pool, void*(*function)(void*), void* arg)

{

    pthread_mutex_lock(&(pool->lock));

    while((pool->queue_size == pool->queue_max_size) &&

    !(pool->shutdown)){

    pthread_cond_wait(&(pool->queue_not_full),&(pool->lock));

}

if(pool->shutdown){

    pthread_mutex_unlock(&(pool->lock));

}

//清空 工作线程 调用的回调函数 的参数arg

if (pool->task_queue[pool->queue_rear].arg != NULL) {

        free(pool->task_queue[pool->queue_rear].arg);

        pool->task_queue[pool->queue_rear].arg = NULL;

    }

    //添加任务到任务队列

    pool->task_queue[pool->queue_rear].function = function;

    pool->task_queue[pool->queue_rear].arg = arg;

    pool->queue_rear = (pool->queue_rear + 1) % pool->queue_max_size;      /* 队尾指针移动, 模拟环形 */

    pool->queue_size++;

    //唤醒工作线程

    pthread_cond_signal(&(pool->queue_not_empty));

    pthread_mutex_unlock(&(pool->lock));

    return 0;

}

//工作线程

void* worker_fun(void* threadpool)

{

    threadpool_t *pool = (threadpool_t*)threadpool;

    threadpool_task_t task;

    while(true){

        pthread_mutex_lock(&(pool->lock));

        while((pool->queue_size == 0) && (!pool->shutdown)){

        printf("thread 0x%x is waiting\n", (unsigned int)pthread_self());

        //释放pool->lock锁,并阻塞在pool->queue_not_empty条件变量上

        pthread_cond_wait(&(pool->queue_not_empty), &(pool->lock));

       //清除指定数目的空闲线程

        if(pool->destroy_thr_num > 0){

            pool->destroy_thr_num--;

            //如果线程个数大于线程池的最小线程个数,则结束线程

            if(pool->live_thr_num > pool->min_thr_num){

                printf("thread 0x%x is exiting\n", (unsigned int)pthread_self());

                pool->live_thr_num--;

                pthread_mutex_unlock(&(pool->lock));//释放锁

                pthread_exit(NULL);

        }

    }

}

//如果shutdown为true,则关闭线程池中的所有线程

if(pool->shutdown){

    pthread_mutex_unlock(&(pool->lock));

    printf("thread 0x%x is exiting\n", (unsigned int)pthread_self());

    pthread_exit(NULL);

}

//正常情况:从任务队列中取任务

task.function = pool->task_queue[pool->queue_front].function;

task.arg = pool->task_queue[pool->queue_front].arg;

pool->queue_front = (pool->queue_front + 1) % pool->queue_max_size;

pool->queue_size--;

//通知可以有新的任务添加进来

pthread_cond_broadcast(&(pool->queue_not_full));

pthread_mutex_unlock(&(pool->lock));

//执行任务

printf("thread 0x%x is working\n", (unsigned int)pthread_self());

pthread_mutex_lock(&(pool->busy_lock));

pool->busy_thr_num++;

pthread_mutex_unlock(&(pool->busy_lock));

if(task.arg == NULL){

    printf("error: task arg is NULL\n");

}

(*(task.function))(task.arg);

//执行任务结束

printf("thread 0x%x end working\n",(unsigned int)pthread_self());

pthread_mutex_lock(&(pool->busy_lock));

pool->busy_thr_num--;

pthread_mutex_unlock(&(pool->busy_lock));

}

pthread_exit(NULL);

}

//管理线程

void* admin_fun(void* threadpool)

{

    int i;

    threadpool_t *pool = (threadpool_t *)threadpool;

    while(!pool->shutdown){

        sleep(DEFAULT_TIME);

        pthread_mutex_lock(&(pool->lock));

        int queue_size = pool->queue_size;

        int live_thr_num = pool->live_thr_num;

        pthread_mutex_unlock(&(pool->lock));

        pthread_mutex_lock(&(pool->busy_lock));

        int busy_thr_num = pool->busy_thr_num;

        pthread_mutex_unlock(&(pool->busy_lock));

        //创建新线程

        if(queue_size >= MIN_WAIT_TASK_NUM && live_thr_num < pool->max_thr_num){

            pthread_mutex_lock(&(pool->lock));

            int add = 0;

            for(i = 0; i < pool->max_thr_num && add < DEFAULT_THREAD_VARY

            && pool->live_thr_num < pool->max_thr_num; i++){

            if(pool->tids[i] == 0 || !is_thread_alive(pool->tids[i])){

                pthread_create(&(pool->tids[i]), NULL, worker_fun, (void*)pool);

                add++;

                pool->live_thr_num++;

        }

    }

    pthread_mutex_unlock(&(pool->lock));

}

//销毁空闲线程

if((busy_thr_num*2) < live_thr_num && live_thr_num > pool->min_thr_num){

    //一次性销毁 DEFAULT_THREAD 个线程

    pthread_mutex_lock(&(pool->lock));

    pool->destroy_thr_num = DEFAULT_THREAD_VARY;

    pthread_mutex_unlock(&(pool->lock));

    for(i = 0; i < DEFAULT_THREAD_VARY; i++){

        //通知工作线程,它们会自行终止

        pthread_cond_signal(&(pool->queue_not_empty));

    }

}

}

return NULL;

}

int threadpool_destroy(threadpool_t *pool)

{

    printf("threadpool_destroy start==\n");

    int i;

    if(pool == NULL){

    return -1;

}

pool->shutdown = true;

//先销毁管理线程

pthread_join(pool->admin_id, NULL);

//通知工作线程

for(i = 0; i < pool->live_thr_num; i++){

    pthread_cond_broadcast(&(pool->queue_not_empty));

}

for(i = 0; i < pool->live_thr_num; i++){

    pthread_join(pool->tids[i], NULL);

}

threadpool_free(pool);

printf("threadpool_destroy end==\n");

return 0;

}

int threadpool_free(threadpool_t *pool)

{

    printf("threadpool_free start==\n");

    if(pool == NULL){

        return -1;

    }

    if(pool->task_queue){

        free(pool->task_queue);

    }

    if(pool->tids){

        free(pool->tids);

        pthread_mutex_lock(&(pool->lock));

        pthread_mutex_destroy(&(pool->lock));

        pthread_mutex_lock(&(pool->busy_lock));

        pthread_mutex_destroy(&(pool->busy_lock));

        pthread_cond_destroy(&(pool->queue_not_empty));

        pthread_cond_destroy(&(pool->queue_not_full));

    }

    free(pool);

    pool = NULL;

    printf("threadpool_free end==\n");

    return 0;

}

int threadpool_all_threadnum(threadpool_t *pool)

{

    int all_threadnum = 0;

    pthread_mutex_lock(&(pool->lock));

    all_threadnum = pool->live_thr_num;

    pthread_mutex_unlock(&(pool->lock));

    return all_threadnum;

}

int threadpool_busy_threadnum(threadpool_t *pool)

{

    int busy_threadnum = 0;

    pthread_mutex_lock(&(pool->busy_lock));

    busy_threadnum = pool->busy_thr_num;

    pthread_mutex_unlock(&(pool->busy_lock));

    return busy_threadnum;

}

int is_thread_alive(pthread_t tid)

{

    //0号信号,检测线程状态

    int kill_rc = pthread_kill(tid, 0);

    if(kill_rc == ESRCH){

        return false;

}

return true;

}

//测试

#if 1

void* process(void* arg)

{

    printf("thread 0x%x working on task %d\n ",(unsigned int)pthread_self(),*(int *)arg);

    sleep(1);

    printf("task %d is end\n",*(int *)arg);

}

int main(void)

{

    threadpool_t *pool = threadpool_create(3, 100, 100);

    printf("pool inited\n");

    int num[20], i;

    for(i = 0; i < 20; i++){

        num[i] = i;

        printf("add task %d\n",i);

        threadpool_add(pool, process, (void*)&num[i]);

    }

    sleep(10);

    for(i = 0; i < 20; i++){

        printf("%3d ", num[i]);

    }

    threadpool_destroy(pool);

    return 0;

}

#endif

完毕!

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 220,137评论 6 511
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,824评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 166,465评论 0 357
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,131评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,140评论 6 397
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,895评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,535评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,435评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,952评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,081评论 3 340
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,210评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,896评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,552评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,089评论 0 23
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,198评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,531评论 3 375
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,209评论 2 357

推荐阅读更多精彩内容