pthread线程池(一)2019-08-15

\color{blue}{线程池的实现原理}
线程池的实现原理基本上就是生产者与消费者模型的扩展,即刚开始开出一定数量的线程,以基本生产者消费者模型为基本原理,不断使用这些线程从"任务队列"中取出任务进行处理,本线程池中的线程通过竞争关系取任务进行处理.

实现过程

  • 定义封装了Task类,具体的任务实现可以通过继承该Task类进行实现.
  • 定义Threadpool线程池类,类内定义了AddTask(添加任务),stop(停止销毁线程池),Task* take(取任务),createthread(创造线程)和线程回调函数等成员,其中在运用pthread_create创建线程时需要第三个参数必须指向一个静态函数.
  • 对线程的同步机制进行了封装,包括信号量sem,互斥锁Locker以及条件变量cond类,方便以后的调用.
    \color{blue}{Locker.h}
#ifndef __LOCKER__
#define __LOCKER__
#include <pthread.h>
#include <exception>
//互斥锁类
class Locker
{
    public:
        Locker()
        {
            if(pthread_mutex_init(&mutex, NULL)!=0)
            {
                throw std::exception();
            }
        }
        ~Locker()
        {
            pthread_mutex_destroy(&mutex);
        }
        bool lock()
        {
            return pthread_mutex_lock(&mutex)==0;
        }
        bool unlock()
        {
            return pthread_mutex_unlock(&mutex)==0;
        }
        pthread_mutex_t* Get()
        {
            return &mutex;
        }
    private:
        pthread_mutex_t mutex;

};
//封装条件变量类
class cond
{
    public:
        cond()
        {
            if(pthread_cond_init(&mcond,NULL)!=0)
            {
                throw std::exception();
            }
        }
        ~cond()
        {
            pthread_cond_destroy(&mcond);
        }
        bool wait(pthread_mutex_t* mutex)
        {
            return pthread_cond_wait( &mcond, mutex)==0;
        }
        bool signal()
        {
            return pthread_cond_signal(&mcond)==0;
        }
        bool signalAll()
        {
            return pthread_cond_broadcast(&mcond)==0;
        }
    private:
        pthread_cond_t mcond;
};
//通过使用ScopeLock类在作用域内会自动调用构造函数和析构函数的优点,方便了互斥锁的加锁和解锁操作
class ScopeLock{
public:
    ScopeLock( Locker& lock ):mlock(lock){mlock.lock();};
    ~ScopeLock( ){mlock.unlock();}
private:
    Locker& mlock;
};
#endif

\color{blue}{Thread.h}

#ifndef __THREAD_OR__
#define __THREAD_OR__
#include <deque>
#include <pthread.h>
#include <string>
#include <stdio.h>
#include "Locker.h"
//任务类
class Task
{
    public:
        virtual void execute()=0;     //纯虚函数,在子类中实现
        void run()
        {
            execute();
            delete this;
        }
};
//线程池类
class Threadpool
{
    public:
        Threadpool(int threadnum);   
        ~Threadpool();
        void AddTask(Task* task);     
        void stop();
        Task* take();      //取任务
        int size();          //任务队列中任务个数   
        int createthread();     //创建线程
        static void* createthreadcb(void* arg);    //线程回调
     private:
        volatile bool             isRunning; //告诉编译器不要进行任何优化,运行状态
        int                              threadNums;   //线程池内线程的数量 
        std::deque<Task*>    taskQueue;   //任务队列
        pthread_t*                 tid;               //线程标识符
        Locker                    mlocker;       //互斥锁
        cond                       mcond;     //条件变量
};
#endif

\color{blue}{Thread.cpp}

#include <iostream>
#include <string>
#include "thread.h"
#include <assert.h>
#include <stdlib.h>
#include "Locker.h"

Threadpool::Threadpool(int threadnum):
    threadNums(threadnum){
        isRunning = true;
        createthread();
};

Threadpool::~Threadpool()
{
    stop();
    for(std::deque<Task*>::iterator it = taskQueue.begin();
            it!=taskQueue.end();++it)
    {
        delete *it;
    }
    taskQueue.clear();
}

int Threadpool::createthread()
{
    ScopeLock guard(mlocker);
    tid = (pthread_t*)malloc(sizeof(pthread_t)*threadNums);
    for(int i=0; i<threadNums; ++i)
    {
        pthread_create(&tid[i],NULL,createthreadcb,(void*)this);
       //把本身为void*类型的this指针重新强制类型转换为Threadpool* 类型
   }
    return 0;
}

void Threadpool::AddTask(Task* task)
{
    ScopeLock guard(mlocker);
    taskQueue.push_back(task);
    int size = taskQueue.size();
    mcond.signal();
    return;
}

int Threadpool::size()
{
    ScopeLock guard(mlocker);
    int size = taskQueue.size();
    return size;
}

void Threadpool::stop()
{
    if(!isRunning)
    {
        return;
    }
    isRunning = false;
    mcond.signalAll();     //唤醒线程池内所有阻塞的线程,stop
    for(int i=0; i<threadNums; ++i)
    {
        pthread_join(tid[i],NULL);
    }
    free(tid);
    tid = NULL;
}
//取任务
Task* Threadpool::take()
{
    Task* task=NULL;
    while(!task)
    {
        {
            ScopeLock guard(mlocker);    //锁
            while(taskQueue.empty() && isRunning)
            {
                 mcond.wait(mlocker.Get());    //阻塞
             };
             if(!isRunning)
             {
                 break;
             };
        }
        ScopeLock guard(mlocker);
    std::cout << "take a task" << std::endl;
        task = taskQueue.front();
        taskQueue.pop_front();
    }
    return task;
}

void* Threadpool::createthreadcb(void* arg)
{
    pthread_t mtid = pthread_self();
    Threadpool* pool = (Threadpool*) arg;
    while(pool->isRunning)
    {
        Task* task = pool->take();
        if(!task)
        {
            printf("thread %lu will exit\n",mtid);
            break;
        }
        assert(task);
        task->run();  //任务执行,调用子任务类内的execute函数内容
}
    return 0;
}

\color{blue}{main.cpp}
测试main函数:

#include <iostream>
#include <stdio.h>
#include "Locker.h"
#include "thread.h"
#include <unistd.h>
#include <stdlib.h>

class mytask : public Task
{
    public:
        virtual void execute()
        {
            std::cout << "Task A" << std::endl;
            std::cout << "thread:" << pthread_self() << std::endl;
            sleep(1);
        }
};
int main()
{
    Threadpool threadpool(10);   //创建十个线程
    for(int i=0; i<20; i++)
    {
        threadpool.AddTask(new mytask);
    sleep(1);
    }
    while(1)
    {
         if(threadpool.size()==0)     //当任务队列中的任务为0之后,stop
         {
             threadpool.stop();
             std::cout << "exit now all" << std::endl;
             exit(0);
         };
    sleep(2);
    };
    return 0;
}

\color{blue}{编译}
g++ main.cpp thread.cpp -lpthread
\color{blue}{运行结果}

take a task
Task A
thread:139751073740544
take a task
Task A
thread:139751140882176
take a task
Task A
thread:139751132489472
take a task
Task A
thread:139751149274880
take a task
Task A
thread:139751149274880
take a task
Task A
thread:139751107311360
take a task
Task A
thread:139751098918656
take a task
Task A
thread:139751124096768
take a task
Task A
thread:139751090525952
take a task
Task A
thread:139751090525952
take a task
Task A
thread:139751073740544
take a task
Task A
thread:139751140882176
take a task
Task A
thread:139751132489472
take a task
Task A
thread:139751115704064
take a task
Task A
thread:139751149274880
take a task
Task A
thread:139751107311360
take a task
Task A
thread:139751098918656
take a task
Task A
thread:139751124096768
take a task
Task A
thread:139751082133248
take a task
Task A
thread:139751090525952
thread 139751073740544 will exit
thread 139751115704064 will exit
thread 139751132489472 will exit
thread 139751140882176 will exit
thread 139751107311360 will exit
thread 139751098918656 will exit
thread 139751149274880 will exit
thread 139751124096768 will exit
thread 139751082133248 will exit
exit now all

因为固定了线程的数量,本线程池并不会自动扩展,下一篇的线程池可以实现自动扩展和减少以维持线程池的稳定.

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

  • Swift1> Swift和OC的区别1.1> Swift没有地址/指针的概念1.2> 泛型1.3> 类型严谨 对...
    cosWriter阅读 13,902评论 1 32
  • 1.设计模式是什么? 你知道哪些设计模式,并简要叙述?设计模式是一种编码经验,就是用比较成熟的逻辑去处理某一种类型...
    龍飝阅读 6,634评论 0 12
  • 【JAVA 线程】 线程 进程:是一个正在执行中的程序。每一个进程执行都有一个执行顺序。该顺序是一个执行路径,或者...
    Rtia阅读 7,761评论 2 20
  • 序言 近日后台需要一些数据,需要从网上爬取,但是爬取的过程中,由于访问速度太频繁,造成IP被封,最终通过线程池解决...
    非专业程序员阅读 4,326评论 0 3
  • 今天下班回来后,开始找钥匙开门,虽然在包里翻了好久但是发现还是找到了,开始嘲笑自己,以前的时候总是丢钥匙,现在竟然...
    不知道已经被占用阅读 4,849评论 0 0

友情链接更多精彩内容