创建线程:
#include <pthread.h>
pthread_t t;//声明一个线程
pthread_create (thread, attr, start_routine, arg)
终止线程:
pthread_exit (status) //销毁当前线程
例子一:创建多线程
// 线程的运行函数
void* say_hello(void* args){
cout << "Hello Runoob!" << endl;
cout<<"new thread:"<<pthread_self()<<endl; //打印当前子线程
return 0;
}
int main(){
pthread_t tid;
int ret =pthread_create(&tid,NULL,say_hello,NULL);
if(ret !=0) {
cout << "pthread_create error: error_code=" << ret << endl;
}
cout<<"entering main thread"<<endl;
cout<<"main thread:"<<pthread_self()<<endl; //打印当前主线程
sleep(1.0); //等待1s否则主线程销毁过快,子线程来不及反应;或者使用pthread_exit显式退出主线程,则子线程会继续执行
}
例子二:线程同步
pthread_join() 子线程阻碍当前线程,当前线程阻塞,直到子线程终止为止。
void* say_hello2(void*);
void*say_hello(void* args){
pthread_t tid;
int ret =pthread_create(&tid,NULL,say_hello2,NULL);
if(ret !=0) {
cout << "pthread_create error: error_code=" << ret << endl;
}
pthread_join(tid,NULL); //子线程二阻塞当前线程
cout << "Hello Runoob!" << endl;
cout<<"new thread:"<<pthread_self()<<endl;
return 0;
}
void*say_hello2(void* args){
sleep(1.5);
cout << "Hello Runoob2!" << endl;
cout<<"new thread2:"<<pthread_self()<<endl;
return 0;
}
int main(){
pthread_t tid;
int ret =pthread_create(&tid,NULL,say_hello,NULL);
if(ret !=0) {
cout << "pthread_create error: error_code=" << ret << endl;
}
cout<<"entering main thread"<<endl;
cout<<"main thread:"<<pthread_self()<<endl;
pthread_exit(NULL);
}
例子三:互斥锁 mutex
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;/*初始化互斥锁*/
pthread_mutex_init(&mutex,NULL);/*动态初始化互斥锁*/
pthread_mutex_lock(&mutex);//加锁
pthread_mutex_unlock(&mutex);//解锁
pthread_mutex_destroy(&mutex);//销毁互斥锁
代码:
pthread_mutex_t my_mutex;
// 线程的运行函数
void*print_msg(void*arg){
int tid = *((int*)arg);
int i=0;
pthread_mutex_lock(&my_mutex);
for(i=0;i<15;i++){
printf("output : %d-%d\n",tid,i);
usleep(100);
}
pthread_mutex_unlock(&my_mutex);
return NULL;
}
int main(){
pthread_t id1;
pthread_t id2;
pthread_mutex_init(&my_mutex,NULL);
pthread_create(&id1,NULL,print_msg,&id1);
pthread_create(&id2,NULL,print_msg,&id2);
pthread_join(id1,NULL); //阻塞主线程,防止主线程和锁提前结束或释放
pthread_join(id2,NULL); //阻塞主线程
pthread_mutex_destroy(&my_mutex);
cout << "Main: program exiting." << endl;
}
条件变量:
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;/*初始化条件变量*/
pthread_cond_signal(&cond);//发送信号量 跟wait函数不在同一个线程中
pthread_cond_wait(&cond,&mutex);//阻塞线程,等待条件变量,同时解锁互斥量
pthread_cond_destroy(&cond);//销毁条件变量
pthread_cond_signal函数,它在一个线程中,用来发送信号。一个是pthread_cond_wait函数,他在另一个线程中,用来接收信号。pthread_cond_wait函数会释放相应的锁,处于等待队列,让其他线程获得锁继续执行,这样其他线程才有机会给他发信号;当它接收到信号时,会重新去获得锁,如果没有获得锁,就阻塞等待,直到获得锁,才执行接收信号的相应操作。
例子:
static pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
bool finish = false;
struct node {
int n_number;
struct node *n_next;
} *head=NULL;
/*[thread_func]*/
static void cleanup_handler(void *arg)
{
printf("Cleanup handler of second thread./n");
if(arg) {
free(arg);
}
(void)pthread_mutex_unlock(&mtx);
pthread_cond_signal(&cond); //之所以signal一下是怕某个字线程一直wait状态
}
static void *thread_func(void *arg)
{
struct node*p =NULL;
pthread_cleanup_push(cleanup_handler, p);
while(!finish) {
pthread_mutex_lock(&mtx); //这个mutex主要是用来保证pthread_cond_wait的并发性
while(head==NULL&&!finish) {
cout<<"开始进入等待"<
//这个while要特别说明一下,单个pthread_cond_wait功能很完善,为何这里要有一个while (head == NULL)呢?因为pthread_cond_wait里的线程可能会被意外唤醒,如果这个时候head=NULL,则不是我们想要的情况。这个时候,应该让线程继续进入pthread_cond_wait
pthread_cond_wait(&cond, &mtx); // pthread_cond_wait会先解除之前的pthread_mutex_lock锁定的mtx,然后阻塞在等待对列里休眠,直到再次被唤醒(大多数情况下是等待的条件成立而被唤醒,唤醒后,该进程会先锁定先pthread_mutex_lock(&mtx);,再读取资源
//用这个流程是比较清楚的/*block-->unlock-->wait() return-->lock*/
}
p =head;
if(p) {
head=head->n_next;
cout<<"Got "<<p->n_number<<" from front of queue"<<endl;
free(p);
if(p->n_number>=9) {
finish=true;
}
}
pthread_mutex_unlock(&mtx); //临界区数据操作完毕,释放互斥锁
}
cout<<"结束"<
pthread_cleanup_pop(1); //当pthread_cleanup_pop()函数的参数为0时,仅仅在线程调用pthread_exit函数或者其它线程对本线程调用 pthread_cancel函数时才调用cleanup_handler
return0;
}
int main()
{
pthread_t tid;
pthread_t tid2;
int i;
struct node*p;
pthread_create(&tid, NULL, thread_func, NULL); //子线程会一直等待资源,类似生产者和消费者,但是这里的消费者可以是多个消费者,而不仅仅支持普通的单个消费者,这个模型虽然简单,但是很强大
pthread_create(&tid2,NULL,thread_func,NULL);
sleep(1.0);
for(i =0; i <10; i++) {
p = (node*)malloc(sizeof(struct node));
p->n_number= i;
pthread_mutex_lock(&mtx); //需要操作head这个临界资源,先加锁,
p->n_next = head; //生产者一直产生head加在末尾,消费者一直消费处于末尾的head
head= p;
pthread_cond_signal(&cond);
pthread_mutex_unlock(&mtx); //解锁
sleep(1.0);
}
// printf("thread 1 wanna end the line.So cancel thread 2./n");
// pthread_cancel(tid); //关于pthread_cancel,有一点额外的说明,它是从外部终止子线程,子线程会在最近的取消点,退出线程,而在我们的代码里,最近的取消点肯定就是pthread_cond_wait()了。关于取消点的信息,有兴趣可以google,这里不多说了
pthread_join(tid,NULL);
pthread_join(tid2,NULL);
printf("All done -- exiting/n");
return 0;
}
ijk线程池://生产者消费者模式,每个线程的for循环会不断的从任务池中取出任务来执行
typedef struct IjkThreadPoolContext {
pthread_mutex_t lock;
pthread_cond_t notify;
pthread_t*threads;
IjkThreadPoolTask *queue;
int thread_count; //线程数量
int queue_size; //任务队列数量
int queue_head; //队头标志,从该位置取出任务后+1
int queue_tail; //队尾标志,该位置一般为空
int pending_count; //等待执行的任务
int shutdown;
int started_count;
} IjkThreadPoolContext;
IjkThreadPoolContext*ijk_threadpool_create(int thread_count, int queue_size, int flags){
IjkThreadPoolContext *ctx;
ctx->threads= (pthread_t*)calloc(1,sizeof(pthread_t) * thread_count);
//创建thread_count个线程,void *calloc(size_t n, size_t size); 在内存的动态存储区中分配n个长度为size的连续空间,函数返回一个指向分配起始地址的指针
ctx->queue = (IjkThreadPoolTask *)calloc //创建queue_size个任务
(queue_size,sizeof(IjkThreadPoolTask));
}
static void *ijk_threadpool_thread(void *pool_ctx) //线程实际执行函数,该方法不停的从任务队列中取任务执行
{
IjkThreadPoolContext *ctx = (IjkThreadPoolContext *)pool_ctx;
IjkThreadPoolTask task;
for(;;) { //一个for循环,不断的从任务池中取任务来执行
pthread_mutex_lock(&(ctx->lock));
while((ctx->pending_count==0) && (!ctx->shutdown)) {
pthread_cond_wait(&(ctx->notify), &(ctx->lock)); //如果当前任务池没有任务就等待
}
if((ctx->shutdown==IJK_IMMEDIATE_SHUTDOWN) ||
((ctx->shutdown==IJK_LEISURELY_SHUTDOWN) &&
(ctx->pending_count==0))) {
break;
}
/* Grab our task */
task.function = ctx->queue[ctx->queue_head].function; //线程从任务池中取任务
task.in_arg = ctx->queue[ctx->queue_head].in_arg;
task.out_arg = ctx->queue[ctx->queue_head].out_arg;
ctx->queue_head= (ctx->queue_head+1) % ctx->queue_size;
ctx->pending_count-=1; //等待的任务数-1
pthread_mutex_unlock(&(ctx->lock));
(*(task.function))(task.in_arg, task.out_arg);
}
ctx->started_count--;
pthread_mutex_unlock(&(ctx->lock));
pthread_exit(NULL);
return(NULL);
}
int ijk_threadpool_add(IjkThreadPoolContext*ctx,Runablefunction,
void *in_arg, void *out_arg, int flags)
{
int err =0;
int next;
if(ctx ==NULL|| function ==NULL) {
returnIJK_THREADPOOL_INVALID;
}
if(pthread_mutex_lock(&(ctx->lock)) !=0) {
returnIJK_THREADPOOL_LOCK_FAILURE;
}
if (ctx->pending_count==MAX_QUEUE|| ctx->pending_count== ctx->queue_size) {
pthread_mutex_unlock(&ctx->lock);
returnIJK_THREADPOOL_QUEUE_FULL;
}
if(ctx->pending_count== ctx->queue_size-1) { //任务超过数量重新分配空间
int new_pueue_size = (ctx->queue_size*2) >MAX_QUEUE?MAX_QUEUE: (ctx->queue_size*2);
IjkThreadPoolTask *new_queue = (IjkThreadPoolTask *)realloc(ctx->queue, sizeof(IjkThreadPoolTask) * new_pueue_size);
if (new_queue) {
ctx->queue= new_queue;
ctx->queue_size= new_pueue_size;
}
}
next = (ctx->queue_tail+1) % ctx->queue_size;
do {
/* Are we shutting down ? */
if(ctx->shutdown) {
err =IJK_THREADPOOL_SHUTDOWN;
break;
}
/* Add task to queue */
ctx->queue[ctx->queue_tail].function= function; //将任务添加到队尾
ctx->queue[ctx->queue_tail].in_arg = in_arg;
ctx->queue[ctx->queue_tail].out_arg = out_arg;
ctx->queue_tail= next;
ctx->pending_count+=1;
/* pthread_cond_broadcast */
if(pthread_cond_signal(&(ctx->notify)) !=0) {
err =IJK_THREADPOOL_LOCK_FAILURE;
break;
}
} while(0);
if(pthread_mutex_unlock(&ctx->lock) !=0) {
err =IJK_THREADPOOL_LOCK_FAILURE;
}
return err;
}