高并发服务端架构(假想,并没有生产实践)

  • 流程说明:
  • 1、initThreadPool创建线程池,同时阻塞在decrement_count 函数的等待锁和条件变量
  • 2、主线程创建epoll对象,阻塞在epoll_wait等待时间,要是连接事件,则加入epoll对象,要是读写事件,则唤醒其中一个空闲的线程处理读写。
static unsigned int threadParameter[NUMBER][8];//线程参数
    pthread_t threadId[NUMBER];//线程id
    pthread_mutex_t threadLock[NUMBER];//线程锁
    pthread_cond_t count_nonzero[NUMBER];
    int count1[NUMBER]={0};
    static struct  dataPacket
    {
        struct epoll_event ev;
        struct epoll_event waitEvent[LINET];
        int sockNumber[LINET]={0};
        int MAX=0;
        int epfd=0;
    }ThreaDataPackage;
    void decrement_count (int i)   
    {

        pthread_mutex_lock (threadLock+i);
        while(count1[i]==0)
            pthread_cond_wait( count_nonzero+i, threadLock+i);
        count1[i]=0;
        pthread_mutex_unlock (threadLock+i);
    }
    void increment_count(int i)
    {
        pthread_mutex_lock(threadLock+i);
        pthread_cond_signal(count_nonzero+i);
        count1[i]=1;
        pthread_mutex_unlock(threadLock+i);
    }
    void * serverSocket(unsigned int *parameter)//线程主函数
    {   char buf[1024];
    char buff[1024];
    pthread_detach(pthread_self());
    while(1)
    {
        decrement_count (parameter[7]);
        printf("启动线程:%d\n",parameter[7]);
        memset(buf,0,sizeof(buf));
        memset(buff,0,sizeof(buff));
        int len=recv(parameter[1], buf, 1024, MSG_NOSIGNAL);//非阻塞模式的消息接收
        if(len>0)
        {
            printf("%s\n",buf);
        }
        if(len==0)
        {
            for(int i=0;i<LINET;i++)
            {
                if(parameter[1]==ThreaDataPackage.sockNumber[i])
                {   ThreaDataPackage.MAX--;
                ThreaDataPackage.sockNumber[i]=0;
                close(ThreaDataPackage.sockNumber[i]);
                printf("客户端%d下线\n",ThreaDataPackage.MAX);
                if (epoll_ctl(ThreaDataPackage.epfd, EPOLL_CTL_DEL,parameter[1], &ThreaDataPackage.ev) < 0)//加入epoll事件集合
                {
                    perror("epoll_ctl error:");

                }
                break;
                }
            }
        }
        sprintf(buff ,"你好客户端我是第%d您发送的是:",parameter[7]);
        strcat(buff,buf);
        len=send(parameter[1],buff,1024,MSG_NOSIGNAL);//非阻塞模式的消息发送
        memset(buff,0,sizeof(buff));
        parameter[0]= 0;//设置线程占用标志为"空闲"
    }
    }

    static int initThreadPool(void)//初始化数据
    {   
        int a=0;
        for(int i=0;i<NUMBER;i++)
        {
            threadParameter[i][0]=0;
            threadParameter[i][7]=i;
            pthread_cond_init(count_nonzero+i,NULL);
            pthread_mutex_init(threadLock+i,NULL);
            a= pthread_create( threadId+ i, NULL, (void* (*)(void *))serverSocket,(void *)(threadParameter[i]));
            if(a!=0)
            {
                perror("pthread_create error:");
                return -1;
            }
        }
        return 0;
    }

    static int  initListen(char*ip,int port,int listenMax)//初始化监听
    {   int a=0;
    int sockfd=socket(AF_INET,SOCK_STREAM,0);
    if(sockfd<0)
    {
        perror("sockt error:");
        close(sockfd);
        return -1;
    }
    struct sockaddr_in server_addr;
    bzero(&server_addr, sizeof(server_addr));
    server_addr.sin_family=AF_INET;
    inet_pton(AF_INET,ip,&(server_addr.sin_addr));
    server_addr.sin_port=htons(port);
    int opt = 1;
    setsockopt(sockfd, SOL_SOCKET,SO_REUSEADDR, (const void *) &opt, sizeof(opt));
    a=bind(sockfd,(struct sockaddr*)&server_addr,sizeof(server_addr));
    if(a<0)
    {
        perror("bind error:");
        close(sockfd);
        return -1;
    }
    a=listen(sockfd,listenMax);
    if(a<0)
    {
        perror("listen error:");
        close(sockfd);
        return -1;
    }
    return sockfd;
    }
    bool setNonBlock(int fd)//设置文件描述符为NonBlock
    {
        int flags = fcntl(fd, F_GETFL, 0);
        flags |= O_NONBLOCK;
        if(-1 == fcntl(fd, F_SETFL, flags))
        {
            return false;
        }
        return true;
    }
    int main()
    {
        int acceptSockfd=0;//accept返回的套接字
        int sockfd=0;//服务器套接字
        int nfds=0;//触发事件的个数
        socklen_t addrLen; //地址信息长度
        struct sockaddr_in clinetAddr; //IPv4地址结构
        if(0!=initThreadPool())   //初始化条件变量、互斥量、创建线程池
        {
            perror("initThreadPool error:");
            exit(-1);
        }
        sockfd=initListen(IP,PORT,LISTENMAX);  //创建监听socket
        ThreaDataPackage.sockNumber[0]=sockfd;
        if(sockfd<0)
        {
            perror("initListen error:");
            exit(-1);
        }
        ThreaDataPackage.epfd = epoll_create(8);//生成文件描述符
        ThreaDataPackage.ev.events = EPOLLIN | EPOLLET;//对应的文件描述符可读并且是et的epoll工作模式
        ThreaDataPackage.ev.data.fd =sockfd ;
        if (epoll_ctl(ThreaDataPackage.epfd , EPOLL_CTL_ADD,sockfd, &ThreaDataPackage.ev) < 0)//加入epoll事件集合
        {
            perror("epoll_ctl error:");
            exit(-1);
        }
        while(1)
        {
            nfds = epoll_wait(ThreaDataPackage.epfd , ThreaDataPackage.waitEvent, ThreaDataPackage.MAX+1, -1);
            printf("nfds::%d\n",nfds);
            for(int i=0;i<nfds;i++)
            {
                if((sockfd==ThreaDataPackage.waitEvent[i].data.fd)&&(EPOLLIN==ThreaDataPackage.waitEvent[i].events&EPOLLIN))
                {
                    addrLen=sizeof(struct sockaddr_in);
                    bzero(&clinetAddr,addrLen);
                    for(int j=0;j<LINET;j++)
                    {
                        if(ThreaDataPackage.sockNumber[j]==0)
                        {
                            ThreaDataPackage.sockNumber[j]= accept(sockfd, (struct sockaddr *)&clinetAddr, &addrLen);
                            if(ThreaDataPackage.sockNumber[j]<0)
                            {
                                perror("accept error:");
                                continue;
                            }
                            else
                            {
                                ThreaDataPackage.ev.data.fd = ThreaDataPackage.sockNumber[j];
                                ThreaDataPackage.ev.events = EPOLLIN|EPOLLET;
                                if (epoll_ctl(ThreaDataPackage.epfd , EPOLL_CTL_ADD,ThreaDataPackage.sockNumber[j], &ThreaDataPackage.ev) < 0)//加入epoll事件集合
                                {
                                    perror("epoll_ctl error:");
                                    exit(-1);
                                }
                                setNonBlock(ThreaDataPackage.sockNumber[j]);//设置为非阻塞
                                ThreaDataPackage.MAX++;
                                printf("客户端%d上线\n",ThreaDataPackage.MAX);
                                break;
                            }
                        }
                    }
                }
                else if(ThreaDataPackage.waitEvent[i].data.fd>3&&( EPOLLIN == ThreaDataPackage.waitEvent[i].events & (EPOLLIN|EPOLLERR)))
                {
                    for(int j=0;j<NUMBER;j++)
                    {
                        if(0==threadParameter[j][0])
                        {
                            threadParameter[j][0]=1;//设置活动标志为"活动"
                            threadParameter[j][1]=ThreaDataPackage.waitEvent[i].data.fd;//客户端的套接字
                            increment_count(j);
                            break;
                        }
                    }
                }
            }
        }
        return 0;
    }

    return 0;
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容