比起普通的多线程服务器,线程池的效率更高,因为线程的创建和销毁是非常耗性能的.所以线程池就是创建若干个工作线程和一个管理者线程,在任务量大的时候,管理者线程会新建工作线程,在任务量小的时候会销毁空闲的工作线程,而不是每次线程执行完后就立马销毁.
threadpool.c
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#include <assert.h>
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <signal.h>
#include <threadpool.h>
#define DEFAULT_TIME 10
//任务队列中等待的任务大于该值时,添加新的线程到线程池
#define MIN_WAIT_TASK_NUM 10
//每次创建和销毁的线程数量
#define DEFAULT_THREAD_VARY 10
#define true 1
#define false 0
//任务结构体
typedef struct {
void* (*function)(void*);
void* arg;
}threadpool_task_t;
//线程池结构体
struct threadpool_t{
pthread_mutex_t lock;
pthread_mutex_t busy_lock;
pthread_cond_t queue_not_full;
pthread_cond_t queue_not_empty;
pthread_t *tids;
pthread_t admin_id;
threadpool_task_t *task_queue; //任务队列
int min_thr_num;
int max_thr_num;
int live_thr_num; //当前存活线程个数
int busy_thr_num; //当前繁忙线程个数
int destroy_thr_num; //要销毁的线程个数
int queue_front;
int queue_rear;
int queue_size;
int queue_max_size;
int shutdown;
};
void* worker_fun(void* threadpool);
void* admin_fun(void* threadpool);
int is_thread_alive(pthread_t tid);
int threadpool_free(threadpool_t* pool);
threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size){
int i;
printf("threadpool_create start==\n");
threadpool_t *pool = NULL;
do{
if((pool = (threadpool_t*)malloc(sizeof(threadpool_t))) == NULL){
printf("malloc threadpool failed\n");
break;
}
pool->min_thr_num = min_thr_num;
pool->max_thr_num = max_thr_num;
pool->busy_thr_num = 0;
pool->live_thr_num = min_thr_num;
pool->queue_size = 0;
pool->queue_max_size = queue_max_size;
pool->queue_front = 0;
pool->queue_rear = 0;
pool->shutdown = false;
//根据最大线程个数开辟线程空间
pool->tids = (pthread_t*)malloc(sizeof(pthread_t)*max_thr_num);
if(pool->tids == NULL){
printf("malloc threads error\n");
break;
}
memset(pool->tids, 0, sizeof(pthread_t) * max_thr_num);
//任务队列开辟空间
pool->task_queue = (threadpool_task_t*)malloc(sizeof(threadpool_t)*queue_max_size);
if(pool->task_queue == NULL){
printf("malloc task_queue error\n");
break;
}
//初始化互斥锁,条件变量
if(pthread_mutex_init(&(pool->lock),NULL) != 0
|| pthread_mutex_init(&(pool->busy_lock), NULL) != 0
|| pthread_cond_init(&(pool->queue_not_empty), NULL) != 0
|| pthread_cond_init(&(pool->queue_not_full), NULL) != 0)
{
printf("init lock or cond failed\n");
break;
}
//创建若干工作线程 和 一个管理线程
for(i = 0; i < min_thr_num; i++){
pthread_create(&(pool->tids[i]), NULL, worker_fun, (void*)pool);
printf("thread[%d] %d created\n", i, (unsigned int)pool->tids[i]);
}
pthread_create(&(pool->admin_id), NULL, admin_fun, (void*)pool);
return pool;
}while(0);
threadpool_free(pool);
return NULL;
}
//向线程池中添加一个任务
int threadpool_add(threadpool_t *pool, void*(*function)(void*), void* arg)
{
pthread_mutex_lock(&(pool->lock));
while((pool->queue_size == pool->queue_max_size) &&
!(pool->shutdown)){
pthread_cond_wait(&(pool->queue_not_full),&(pool->lock));
}
if(pool->shutdown){
pthread_mutex_unlock(&(pool->lock));
}
//清空 工作线程 调用的回调函数 的参数arg
if (pool->task_queue[pool->queue_rear].arg != NULL) {
free(pool->task_queue[pool->queue_rear].arg);
pool->task_queue[pool->queue_rear].arg = NULL;
}
//添加任务到任务队列
pool->task_queue[pool->queue_rear].function = function;
pool->task_queue[pool->queue_rear].arg = arg;
pool->queue_rear = (pool->queue_rear + 1) % pool->queue_max_size; /* 队尾指针移动, 模拟环形 */
pool->queue_size++;
//唤醒工作线程
pthread_cond_signal(&(pool->queue_not_empty));
pthread_mutex_unlock(&(pool->lock));
return 0;
}
//工作线程
void* worker_fun(void* threadpool)
{
threadpool_t *pool = (threadpool_t*)threadpool;
threadpool_task_t task;
while(true){
pthread_mutex_lock(&(pool->lock));
while((pool->queue_size == 0) && (!pool->shutdown)){
printf("thread 0x%x is waiting\n", (unsigned int)pthread_self());
//释放pool->lock锁,并阻塞在pool->queue_not_empty条件变量上
pthread_cond_wait(&(pool->queue_not_empty), &(pool->lock));
//清除指定数目的空闲线程
if(pool->destroy_thr_num > 0){
pool->destroy_thr_num--;
//如果线程个数大于线程池的最小线程个数,则结束线程
if(pool->live_thr_num > pool->min_thr_num){
printf("thread 0x%x is exiting\n", (unsigned int)pthread_self());
pool->live_thr_num--;
pthread_mutex_unlock(&(pool->lock));//释放锁
pthread_exit(NULL);
}
}
}
//如果shutdown为true,则关闭线程池中的所有线程
if(pool->shutdown){
pthread_mutex_unlock(&(pool->lock));
printf("thread 0x%x is exiting\n", (unsigned int)pthread_self());
pthread_exit(NULL);
}
//正常情况:从任务队列中取任务
task.function = pool->task_queue[pool->queue_front].function;
task.arg = pool->task_queue[pool->queue_front].arg;
pool->queue_front = (pool->queue_front + 1) % pool->queue_max_size;
pool->queue_size--;
//通知可以有新的任务添加进来
pthread_cond_broadcast(&(pool->queue_not_full));
pthread_mutex_unlock(&(pool->lock));
//执行任务
printf("thread 0x%x is working\n", (unsigned int)pthread_self());
pthread_mutex_lock(&(pool->busy_lock));
pool->busy_thr_num++;
pthread_mutex_unlock(&(pool->busy_lock));
if(task.arg == NULL){
printf("error: task arg is NULL\n");
}
(*(task.function))(task.arg);
//执行任务结束
printf("thread 0x%x end working\n",(unsigned int)pthread_self());
pthread_mutex_lock(&(pool->busy_lock));
pool->busy_thr_num--;
pthread_mutex_unlock(&(pool->busy_lock));
}
pthread_exit(NULL);
}
//管理线程
void* admin_fun(void* threadpool)
{
int i;
threadpool_t *pool = (threadpool_t *)threadpool;
while(!pool->shutdown){
sleep(DEFAULT_TIME);
pthread_mutex_lock(&(pool->lock));
int queue_size = pool->queue_size;
int live_thr_num = pool->live_thr_num;
pthread_mutex_unlock(&(pool->lock));
pthread_mutex_lock(&(pool->busy_lock));
int busy_thr_num = pool->busy_thr_num;
pthread_mutex_unlock(&(pool->busy_lock));
//创建新线程
if(queue_size >= MIN_WAIT_TASK_NUM && live_thr_num < pool->max_thr_num){
pthread_mutex_lock(&(pool->lock));
int add = 0;
for(i = 0; i < pool->max_thr_num && add < DEFAULT_THREAD_VARY
&& pool->live_thr_num < pool->max_thr_num; i++){
if(pool->tids[i] == 0 || !is_thread_alive(pool->tids[i])){
pthread_create(&(pool->tids[i]), NULL, worker_fun, (void*)pool);
add++;
pool->live_thr_num++;
}
}
pthread_mutex_unlock(&(pool->lock));
}
//销毁空闲线程
if((busy_thr_num*2) < live_thr_num && live_thr_num > pool->min_thr_num){
//一次性销毁 DEFAULT_THREAD 个线程
pthread_mutex_lock(&(pool->lock));
pool->destroy_thr_num = DEFAULT_THREAD_VARY;
pthread_mutex_unlock(&(pool->lock));
for(i = 0; i < DEFAULT_THREAD_VARY; i++){
//通知工作线程,它们会自行终止
pthread_cond_signal(&(pool->queue_not_empty));
}
}
}
return NULL;
}
int threadpool_destroy(threadpool_t *pool)
{
printf("threadpool_destroy start==\n");
int i;
if(pool == NULL){
return -1;
}
pool->shutdown = true;
//先销毁管理线程
pthread_join(pool->admin_id, NULL);
//通知工作线程
for(i = 0; i < pool->live_thr_num; i++){
pthread_cond_broadcast(&(pool->queue_not_empty));
}
for(i = 0; i < pool->live_thr_num; i++){
pthread_join(pool->tids[i], NULL);
}
threadpool_free(pool);
printf("threadpool_destroy end==\n");
return 0;
}
int threadpool_free(threadpool_t *pool)
{
printf("threadpool_free start==\n");
if(pool == NULL){
return -1;
}
if(pool->task_queue){
free(pool->task_queue);
}
if(pool->tids){
free(pool->tids);
pthread_mutex_lock(&(pool->lock));
pthread_mutex_destroy(&(pool->lock));
pthread_mutex_lock(&(pool->busy_lock));
pthread_mutex_destroy(&(pool->busy_lock));
pthread_cond_destroy(&(pool->queue_not_empty));
pthread_cond_destroy(&(pool->queue_not_full));
}
free(pool);
pool = NULL;
printf("threadpool_free end==\n");
return 0;
}
int threadpool_all_threadnum(threadpool_t *pool)
{
int all_threadnum = 0;
pthread_mutex_lock(&(pool->lock));
all_threadnum = pool->live_thr_num;
pthread_mutex_unlock(&(pool->lock));
return all_threadnum;
}
int threadpool_busy_threadnum(threadpool_t *pool)
{
int busy_threadnum = 0;
pthread_mutex_lock(&(pool->busy_lock));
busy_threadnum = pool->busy_thr_num;
pthread_mutex_unlock(&(pool->busy_lock));
return busy_threadnum;
}
int is_thread_alive(pthread_t tid)
{
//0号信号,检测线程状态
int kill_rc = pthread_kill(tid, 0);
if(kill_rc == ESRCH){
return false;
}
return true;
}
//测试
#if 1
void* process(void* arg)
{
printf("thread 0x%x working on task %d\n ",(unsigned int)pthread_self(),*(int *)arg);
sleep(1);
printf("task %d is end\n",*(int *)arg);
}
int main(void)
{
threadpool_t *pool = threadpool_create(3, 100, 100);
printf("pool inited\n");
int num[20], i;
for(i = 0; i < 20; i++){
num[i] = i;
printf("add task %d\n",i);
threadpool_add(pool, process, (void*)&num[i]);
}
sleep(10);
for(i = 0; i < 20; i++){
printf("%3d ", num[i]);
}
threadpool_destroy(pool);
return 0;
}
#endif
完毕!