并发编程

并发在下列情况中是很有用的:

  • 访问慢速I/O设备。
  • 与人交互。
  • 通过推迟工作以降低延迟。
  • 服务多个网络客户端。
  • 在多核机器上进行并行计算。

现代操作系统提供了三种基本的构造并发程序的方法:

  • 进程。
  • I/O多路复用。
  • 线程。

基于进程的并发编程

构建并发程序最简单的方法就是进程,进程有独立的地址空间,不用担心一个虚拟地址覆盖另一个进程的虚拟地址,但会导致进程间通信比较麻烦。此外,基于进程的并发往往比较慢,因为进程控制和IPC的开销很高。下面是基于进程的echo服务器版本:

#include<stdio.h>
#include<string.h>
#include<sys/socket.h>
#include<arpa/inet.h>
#include<unistd.h>
#include<signal.h>
#include<sys/types.h>
#include<sys/wait.h>

#define SERVER_PORT 9090
#define BUFFER_SIZE 1024

void sigchild_handler(int sig){
    while(waitpid(0,NULL,WNOHANG) > 0)
        ;
}

int main(int argc, char const *argv[])
{
    struct sockaddr_in server,client;
    int sfd,cfd;
    socklen_t clen;
    int n;
    char buf[BUFFER_SIZE];
    pid_t pid;

    /* register the signal handler,child process have the same handler,
     * but only parent process receive the signal */
    struct sigaction newact;
    newact.sa_handler = sigchild_handler;
    sigemptyset(&newact.sa_mask);
    newact.sa_flags = 0;
    sigaction(SIGCHLD,&newact,NULL);    

    /* initial socket */
    sfd = socket(AF_INET,SOCK_STREAM,0);
    memset(&server,0,sizeof(server));
    server.sin_family = AF_INET;
    server.sin_addr.s_addr = htonl(INADDR_ANY);
    server.sin_port = htons(SERVER_PORT);

    bind(sfd,(struct sockaddr *)&server,sizeof(server));
    listen(sfd,20);

    while(1){
        clen = sizeof(client);
        cfd = accept(sfd,(struct sockaddr *)&client,&clen);
        pid = fork();
        if(pid > 0){ /* parent process */
            close(cfd);
        }else if( pid == 0 ){
            while( (n = read(cfd,buf,BUFFER_SIZE)) > 0){
                write(STDOUT_FILENO,buf,n);
                write(cfd,buf,n);
            }
            close(cfd);
            return 0; /* child process quit */
        }
    }
    close(sfd);
    return 0;
}

基于I/O多路复用的并发编程

在我看来,I/O多路复用就类似于餐馆的服务员。服务员负责监听一些桌子,如果桌子有客人来了,需要给客人菜单;如果某张桌子的菜好了,需要给他们上菜;如果客人吃完了,需要收拾桌子;如果没有事件,那么服务员处于等待的状态,等待某张桌子中上述事件中的一个的发生。很明显,这是并发了,因为通常都是一个服务员,为多张桌子服务。I/O多路复用的思想和这个类似。

I/O多路复用可以用作并发事件驱动程序的基础,在事件驱动程序中,某些事件导致流向前推进。一般的思路是将逻辑流模型化为状态机。基于I/O多路的事件驱动编程的优点是,它比基于线程和进程的设计给了程序员更多的对程序行为的控制,并且,它有明显的性能优势,现代高性能服务器,如Node.js,nginx都使用了这种编程方式。事件驱动设计的明显的缺点就是编码复杂。就如服务员这个比喻,服务员很多情况下无法把一件事做完再去做下一件事,例如如果客人点菜很慢,站在那里等是很浪费资源的。所以,基于这种模式的编程每次执行的代码应该是很短的。通常需要把一个事件分成若干部分,所以I/O多路复用的编码比较复杂。

I/O多路复用的基本函数是select函数,它要求内核挂起进程,只有一个或多个I/O事件发生后,才将控制返回给应用程序。select及其相关函数的原型如下:

/* 返回已准备好的描述符的非零的个数,若出错则返回-1 */
int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);
/* 删除set中的fd */
void FD_CLR(int fd, fd_set *set);
/* 判断fd是否在set中 */
int  FD_ISSET(int fd, fd_set *set);
/* 将fd添加到set中 */
void FD_SET(int fd, fd_set *set);
/* 将set清零 */
void FD_ZERO(fd_set *set);

下面使用select函数对echo服务器进行改写。

#include<stdio.h>
#include<string.h>
#include<sys/socket.h>
#include<arpa/inet.h>
#include<unistd.h>
#include<sys/select.h>

#define SERVER_PORT 9090
#define BUFFER_SIZE 1024

int main(int argc, char const *argv[])
{
    struct sockaddr_in server,client;
    socklen_t clen;
    int sfd;
    char buf[BUFFER_SIZE];
    int n;

    /* initial socket */
    sfd = socket(AF_INET,SOCK_STREAM,0);
    memset(&server,0,sizeof(server));
    server.sin_family = AF_INET;
    server.sin_addr.s_addr = htonl(INADDR_ANY);
    server.sin_port = htons(SERVER_PORT);
    bind(sfd,(struct sockaddr *)&server,sizeof(server));
    listen(sfd,20);

    /* init fd_set */
    int cfd[FD_SETSIZE];
    int maxi = 0;
    int maxfd = sfd;
    int i;
    for(i = 0;i<FD_SETSIZE;i++){
        cfd[i] = -1;
    }
    fd_set allset,tempset;
    FD_ZERO(&allset);
    FD_SET(sfd,&allset);
    int nready;

    while(1){
        tempset = allset;
        nready = select(maxfd+1,&tempset,NULL,NULL,NULL);
        if( FD_ISSET(sfd,&tempset) ){
             clen = sizeof(client);
            int ret = accept(sfd,(struct sockaddr *)&client,&clen);
            for(i = 0;i<FD_SETSIZE;i++){
                if(cfd[i] == -1){
                    cfd[i] = ret;
                    break;
                }
            }
            if(i != FD_SETSIZE){
                if(ret > maxfd){ /* listen to all file descriptors */
                    maxfd = ret;
                }
                if( i > maxi){
                    maxi = i;
                }
                FD_SET(ret,&allset);
            }
            if(--nready == 0){
                continue;
            }
        }
        for(i=0;i<=maxi;i++){
            if( FD_ISSET(cfd[i],&tempset) ){
                if( (n = read(cfd[i],buf,BUFFER_SIZE)) >0 ){
                    write(STDOUT_FILENO,buf,n);
                    write(cfd[i],buf,n);
                }else{
                    close(cfd[i]);
                    FD_CLR(cfd[i],&allset);
                    cfd[i] = -1;
                }
                if(--nready == 0){
                    break;
                }
            }
        }
    }

    close(sfd);
    return 0;
}

基于线程的并发编程

基于线程的逻辑流结合了基于进程和基于I/O多路复用的流的特性。同进程一样,线程由内核自动调度,并且内核通过一个整数ID来识别线程。同基于I/O多路复用的流一样,多个线程运行在单一进程的上下文中,因此共享这个进程虚拟地址空间的所有内容,包括它的代码、数据、堆、共享库和打开的文件。下面基于线程对echo服务器进行改写:

#include<stdio.h>
#include<string.h>
#include<netinet/in.h>
#include<arpa/inet.h>
#include<unistd.h>
#include<pthread.h>

#define SERVER_PORT 9090
#define BUFFER_SIZE 1024
#define MAX_CLIENTS 1024

struct socket_info{
    struct sockaddr_in client;
    int cfd;
};

struct socket_info clients[MAX_CLIENTS];

void *echo(void *arg){
    struct socket_info *si = (struct socket_info *)arg;
    char buf[BUFFER_SIZE];
    int n;
    pthread_detach(pthread_self());
    while( (n = read(si->cfd,buf,BUFFER_SIZE)) >0){
        write(STDOUT_FILENO,buf,n);
        write(si->cfd,buf,n);
    }
    close(si->cfd);
    si->cfd = -1;
    return NULL;
}

int main(int argc, char const *argv[])
{
    struct sockaddr_in server,client;
    socklen_t clen;
    int sfd,cfd;

    /* initial socket */
    sfd = socket(AF_INET,SOCK_STREAM,0);
    memset(&server,0,sizeof(server));
    server.sin_family = AF_INET;
    server.sin_addr.s_addr = htonl(INADDR_ANY);
    server.sin_port = htons(SERVER_PORT);
    bind(sfd,(struct sockaddr *)&server,sizeof(server));
    listen(sfd,20);

    int i;
    pthread_t tid;
    for(i=0;i<MAX_CLIENTS;i++){
        clients[i].cfd = -1;
    }
    while(1){
        clen = sizeof(client);
        cfd = accept(sfd,(struct sockaddr *)&client,&clen);
        for(i = 0;i<MAX_CLIENTS;i++){
            if(clients[i].cfd == -1){
                clients[i].cfd = cfd;
                clients[i].client = client;
                pthread_create(&tid,NULL,echo,(void *)&clients[i]);
                break;
            }
        }
    }
    
    close(sfd);
    return 0;
}

这里要注意和基于进程的echo服务器进行比较。在线程中每个buf需要在线程中的栈进行创建,而进程中是没有这个过程,因为进程是独立的虚拟地址空间。同样的还有client和cfd。但是,在I/O复用中,buf只有一个,这是因为每次把buf的内容发送出去后才进入下一次,中间不会被打断,而线程中可能被打断,所以需要多个buf。

线程池

虽然线程的创建成本低于进程的创建成本,但总的来说,还是比较大的。所以,为了减少线程的创建次数,就有了线程池。线程池的基本思想是,复用线程。也就是说,线程执行完任务后,不是销毁,而是等待下一个任务。这个类似于生产者消费者模型,消息队列和命令模式。线程池的实现的核心有一个任务队列,任务队列里面有执行函数的地址和执行函数的参数。此外,还需要有一个线程来管理其他线程,如线程不够用,需要创建线程;空闲的线程太多,需要删除一些线程。这个线程采取轮询的方式,定时检查。线程池的代码基本上都是参考了别人的,自己敲了一遍,稍微修改了一下。

下面给出线程池的实现方式,采取把代码分段注释的方式,整个代码是完整的。由于线程池的代码有点多,所以就单独写了一个C文件。先看看头文件和定义的结构体和函数。

#ifndef _THREADPOOL_H_
#define _THREADPOOL_H_

typedef struct threadpool_t threadpool_t;
threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size);
int threadpool_add(threadpool_t *pool, void*(*function)(void *arg), void *arg);
int threadpool_destroy(threadpool_t *pool);

#endif
#include<stdio.h>
#include<unistd.h>
#include<stdlib.h>
#include<string.h>
#include<pthread.h>
#include<signal.h>
#include <errno.h>
#include"threadpool.h"

#define MAX_WAIT_TASK 2
#define INTERVAL_TIME 1
#define THREAD_STEP_NUMBER 1

typedef struct{
    void *(*function)(void *); /* thread will call this function */
    void *arg; /* function args */
}threadpool_task_t;

struct threadpool_t{
    pthread_mutex_t lock;
    pthread_mutex_t thread_counter;
    pthread_cond_t queue_not_full;
    pthread_cond_t queue_not_empty;

    pthread_t *threads; /* thread array */
    pthread_t adjust_tid; /* manager thread */
    int min_thr_num; /* threads information */
    int max_thr_num;
    int live_thr_num;
    int busy_thr_num;
    int wait_exit_num;

    threadpool_task_t *task_queue; /* task queue */
    int queue_front; /* task queue imformation */
    int queue_rear;
    int queue_size;
    int queue_max_size;

    int shutdown; /* if shutdown is true,it will notify all threads exit,then thread pool exit */
};

void *threadpool_thread(void *threadpool);
void *adjust_thread(void *threadpool);
int threadpool_free(threadpool_t *threadpool);
int is_thread_alive(pthread_t tid);

这里我感觉体现的C语言中的封装思想,如果结构体只在本文件中使用,那么就不需要在头文件中定义,但是函数中又使用了这个类型,这时候我觉得这种处理方式不错,把typedef的声明写在头文件中。这个线程池的每个函数的代码都不难,难的是这样一种思想,我开始在想这个问题的时候,没有想到生产者消费者模型,也没有想到如何让线程休息和退出。这里是C语言中数组的典型表示,需要维护一个长度,然后知道一个指向数组第一个数的指针就可以了。然后是关于队列的表示,这里的表示也是类似的,使用数组,在加上队列的头尾和队列的长度。

threadpool_t *threadpool_create(int min_thr_num,int max_thr_num,int queue_max_size){
    int i;
    threadpool_t *pool = NULL;
    /* using do while in order to avoid using goto */
    do{
        if( (pool = (threadpool_t *)malloc(sizeof(threadpool_t))) == NULL ){
            break;
        }

        /* initial lock */
        if(pthread_mutex_init(&(pool->lock),NULL) != 0
            ||pthread_mutex_init(&(pool->thread_counter),NULL) != 0
            ||pthread_cond_init(&(pool->queue_not_empty),NULL) !=0
            ||pthread_cond_init(&(pool->queue_not_full),NULL) !=0){
                break;
            }

        /* inital threads */
        pool->threads = (pthread_t *)(malloc(sizeof(pthread_t)*max_thr_num));
        if(pool->threads == NULL){
            break;
        }
        memset(pool->threads,0,sizeof(pthread_t)*max_thr_num);
        pool->min_thr_num = min_thr_num;
        pool->max_thr_num = max_thr_num;
        pool->live_thr_num = min_thr_num;
        pool->busy_thr_num = 0;
        for(i=0;i<min_thr_num;i++){
            pthread_create(&(pool->threads[i]),NULL,threadpool_thread,(void *)pool);
        }
        pthread_create(&(pool->adjust_tid),NULL,adjust_thread,(void *)pool);

        /* inital queue task */
        pool->task_queue = (threadpool_task_t *)malloc(sizeof(threadpool_task_t)*queue_max_size);
        if(pool->task_queue == NULL){
            break;
        }
        pool->queue_max_size = queue_max_size;
        pool->queue_front = 0;
        pool->queue_rear = 0;
        pool->queue_size = 0;

        pool->shutdown = 0;

        return pool;
    }while(0);
    printf("create thread pool fail.\n");
    threadpool_free(pool);
    return NULL;
}

创建线程池的代码没什么好说的,唯一的亮点是do while的用法,这种可以使得出错后跳转到错误处理,避免了使用goto语句。

void *adjust_thread(void *threadpool){
    int i;
    threadpool_t *pool = (threadpool_t *)threadpool;
    while( !pool->shutdown ){
        sleep(INTERVAL_TIME);
        pthread_mutex_lock(&(pool->lock));
        int queue_size = pool->queue_size;
        int live_thr_num = pool->live_thr_num;
        pthread_mutex_unlock(&(pool->lock));
        pthread_mutex_lock(&(pool->thread_counter));
        int busy_thr_num = pool->busy_thr_num;
        pthread_mutex_unlock(&(pool->thread_counter));
        printf("live thrnum = %d,queue_size = %d\n",live_thr_num,queue_size);

        /* check whether need to add thread */
        if(queue_size > MAX_WAIT_TASK && live_thr_num < pool->max_thr_num){
            printf("add threads\n");
            pthread_mutex_lock(&(pool->lock));
            int add = 0;
            for(i=0;i<pool->max_thr_num && add<THREAD_STEP_NUMBER 
                && pool->live_thr_num < pool->max_thr_num;i++){
                if(pool->threads[i] == 0 || !is_thread_alive(pool->threads[i])){
                    if( pool->threads[i] != 0 ){
                        printf("reap threads when delete threads\n");
                        pthread_join(pool->threads[i],NULL);
                    }
                    pthread_create(&(pool->threads[i]),NULL,threadpool_thread,(void *)pool);
                    printf("%d\n",pool->live_thr_num);
                    pool->live_thr_num ++;
                    add++;
                }
            }
            pthread_mutex_unlock(&(pool->lock));
        }

        /* check whether need to delete thread */
        if((busy_thr_num*2)< live_thr_num && live_thr_num > pool->min_thr_num){
            pthread_mutex_lock(&(pool->lock));
            int del = THREAD_STEP_NUMBER;
            if(pool->live_thr_num - THREAD_STEP_NUMBER < pool->min_thr_num){
                del = pool->live_thr_num - pool->min_thr_num;
            }
            pool->wait_exit_num = del;
            pthread_mutex_unlock(&(pool->lock));
            printf("delete threads\n");
            for(i=0;i<THREAD_STEP_NUMBER;i++){
                pthread_cond_signal(&(pool->queue_not_empty));
            }
        }
    }
    return NULL;
}

这个是管理者线程,负责动态创建和回收线程,使用轮询的方式。

int threadpool_add(threadpool_t *pool,void *(*function)(void *arg),void *arg){
    pthread_mutex_lock(&(pool->lock));

    while( (pool->queue_size == pool->queue_max_size) && (!pool->shutdown) ){
        pthread_cond_wait(&(pool->queue_not_full),&(pool->lock));
    }
    if(pool->shutdown){
        pthread_mutex_unlock(&(pool->lock));
        return 0;
    }

    pool->task_queue[pool->queue_rear].function = function;
    pool->task_queue[pool->queue_rear].arg = arg;
    pool->queue_rear = (pool->queue_rear + 1)%pool->queue_max_size;
    pool->queue_size++;

    pthread_cond_signal(&(pool->queue_not_empty));
    pthread_mutex_unlock(&(pool->lock));
}

把任务加入到队列,这里需要注意同步的问题。

int threadpool_free(threadpool_t *pool){
    if(pool == NULL){
        return -1;
    }
    if(pool->task_queue){
        free(pool->task_queue);
    }
    if(pool->threads){
        free(pool->threads);
    }
    pthread_mutex_lock(&(pool->lock));
    pthread_mutex_destroy(&(pool->lock));
    pthread_mutex_lock(&(pool->thread_counter));
    pthread_mutex_destroy(&(pool->thread_counter));
    pthread_cond_destroy(&(pool->queue_not_empty));
    pthread_cond_destroy(&(pool->queue_not_full));
    free(pool);
    return 0;
}

int threadpool_destroy(threadpool_t *pool){
    int i;
    if(pool == NULL){
        return -1;
    }
    pool->shutdown = 1;
    pthread_join(pool->adjust_tid,NULL);

    /* let thread quit,and then reap the thread */
    for(i=0;i<pool->live_thr_num;i++){
        pthread_cond_broadcast(&(pool->queue_not_empty));
    }
    for(i=0;i<pool->live_thr_num;i++){
        pthread_join(pool->threads[i],NULL);
    }
    threadpool_free(pool);
    return 0;
}

int is_thread_alive(pthread_t tid){
    int kill_rc = pthread_kill(tid,0);
    if(kill_rc == ESRCH){
        return 0;
    }
    return 1;
}

销毁线程,这个没有什么好说的。

使用线程池对echo服务器进行改写,发现对性能的提高不大,因为客户端是阻塞的,所以,不能让一个任务等待一个线程的结束,因为不知道什么时候会结束。这里就不贴代码了,和线程的代码基本上一样的。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

  • 一、产品分析: 1、产品概要: 运⽤区块链技术的去中⼼化机制、经济激励、数据共享的优势,为互联⽹应⽤重构⼀个价值再...
    杰操哥说阅读 1,051评论 0 1
  • 2017年8月10日 农历六月十九号 星期四 晴 今晚写日记,我的心情有些沉重。 昨天是时隔多年,第一次拾笔书写。...
    故园小屋阅读 1,814评论 0 0
  • 看她的书,我总是会陷入一种孤寂的状态 后来,听了别人推荐的《七月上》,歌手的声音很沧桑,就像安妮笔下的文字和故事,...
    半亩人阅读 455评论 6 5

友情链接更多精彩内容