反应器模式+epoll 实现服务器

1. 什么是reactor模式

reactor模式是 主线程只负责监听文件描述符上面是否有事件发生,如果有的话,就立即将事件通知工作线程 或 处理新连接的线程。 除此之外,主线程不做任何实质性的事情。

特点:
  1. 尽可能多的使用了c++11的语法
  2. 基于反应器模式的服务器
  3. 使用了epoll来应对高并发
2. reactor + epoll 是实现一个服务器

以下是核心的伪代码:

//主线程一直在这里死循环
while (!pReatcor->m_bStop)
{
    //step1 : 调用epoll_wait,判断有没有监听的时间发生 
    struct epoll_event ev[1024];
    int n = epoll_wait(pReatcor->m_epollfd, ev, 1024, 10);
    if (n == 0)
        continue;
    else if (n < 0)
    {
        std::cout << "epoll_wait error" << std::endl;
        continue;
    }

    //step2 : 如果有监听到有事件发生
    int m = min(n, 1024);
    for (int i = 0; i < m; ++i){
        //如果事件是 新的连接,那么通知 接受连接请求的工作线程去完成
        if (ev[i].data.fd == pReatcor->m_listenfd)
            pReatcor->m_acceptcond.notify_one();
        //如果事件是 read/write ,那么通知 读写socket的工作线程去完成
        else
        {
            {
                std::unique_lock<std::mutex> guard(pReatcor->m_workermutex);
                pReatcor->m_listClients.push_back(ev[i].data.fd);
            }
            pReatcor->m_workercond.notify_one();
            //std::cout << "signal" << std::endl;
        }// end if
    }// end for-loop
}// end while

下面是接受请求的线程的代码:

void CMyReactor::accept_thread_proc(CMyReactor* pReatcor){
    std::cout << "accept thread, thread id = " << std::this_thread::get_id() << std::endl;
    while (true){
        int newfd;
        struct sockaddr_in clientaddr;
        socklen_t addrlen;
        {
            //accept_thread 启动了之后,如果没有请求到就会阻塞在这里
            std::unique_lock<std::mutex> guard(pReatcor->m_acceptmutex);
            pReatcor->m_acceptcond.wait(guard);   //主线程调用notify之后,accept_thread知道有新的连接,然后开始accept
            if (pReatcor->m_bStop)
                break;
            newfd = ::accept(pReatcor->m_listenfd, (struct sockaddr *)&clientaddr, &addrlen);
        }//释放锁

        if (newfd == -1)
            continue;
        std::cout << "new client connected: " << ::inet_ntoa(clientaddr.sin_addr) << ":" 
        << ::ntohs(clientaddr.sin_port) << std::endl;

        //将新socket设置为non-blocking
        int oldflag = ::fcntl(newfd, F_GETFL, 0);
        int newflag = oldflag | O_NONBLOCK;
        if (::fcntl(newfd, F_SETFL, newflag) == -1){
            std::cout << "fcntl error, oldflag =" << oldflag << ", newflag = " << newflag << std::endl;
            continue;
        }

        //将socket加入epoll的监听
        struct epoll_event e;
        memset(&e, 0, sizeof(e));
        e.events = EPOLLIN | EPOLLRDHUP | EPOLLET;
        e.data.fd = newfd;
        if (::epoll_ctl(pReatcor->m_epollfd, EPOLL_CTL_ADD, newfd, &e) == -1)
        {
            std::cout << "epoll_ctl error, fd =" << newfd << std::endl;
        }
    }
    std::cout << "accept thread exit ..." << std::endl;
}

下面是读写的工作线程:

void CMyReactor::worker_thread_proc(CMyReactor* pReatcor)
{
    std::cout << "new worker thread, thread id = " << std::this_thread::get_id() << std::endl;
    while (true)
    {
        //拿到一个可读的句柄
        int clientfd;
        {
            //工作线程在这里加锁,从m_listClients里面读取一个可读的socket
            //m_listClients 里面的元素就是主线程把可读的socket放入这个list
            std::unique_lock<std::mutex> guard(pReatcor->m_workermutex);
            while (pReatcor->m_listClients.empty()){
                if (pReatcor->m_bStop){
                    std::cout << "worker thread exit ..." << std::endl;
                    return;
                }       
                pReatcor->m_workercond.wait(guard);
            }
            clientfd = pReatcor->m_listClients.front();
            pReatcor->m_listClients.pop_front();
        }
        
        //拿到句柄之后,开始读写
        //开始读
        std::cout << std::endl;  //gdb调试时不能实时刷新标准输出,用这个函数刷新标准输出,使信息在屏幕上实时显示出来
        std::string strclientmsg;
        char buff[256];
        bool bError = false;
        while (true)
        {
            memset(buff, 0, sizeof(buff));
            int nRecv = ::recv(clientfd, buff, 256, 0);
            if (nRecv == -1)
            {
                if (errno == EWOULDBLOCK)
                    break;
                else
                {
                    std::cout << "recv error, client disconnected, fd = " << clientfd << std::endl;
                    pReatcor->close_client(clientfd);
                    bError = true;
                    break;
                }

            }
            //对端关闭了socket,这端也关闭。
            else if (nRecv == 0){
                std::cout << "peer closed, client disconnected, fd = " << clientfd << std::endl;
                pReatcor->close_client(clientfd);
                bError = true;
                break;
            }
            strclientmsg += buff;
        }//end 读while
        //出错了,就不要再继续往下执行了
        if (bError)
            continue;
        std::cout << "client msg: " << strclientmsg;

        //开始写
        while (true)
        {
            int nSent = ::send(clientfd, strclientmsg.c_str(), strclientmsg.length(), 0);
            if (nSent == -1)
            {
                if (errno == EWOULDBLOCK)
                {
                    std::this_thread::sleep_for(std::chrono::milliseconds(10));
                    continue;
                }
                else
                {
                    std::cout << "send error, fd = " << clientfd << std::endl;
                    pReatcor->close_client(clientfd);
                    break;
                }
            }
            std::cout << "send: " << strclientmsg;
            strclientmsg.erase(0, nSent);
            if (strclientmsg.empty())
                break;
        }//end 写while
    }//end while
}
主要逻辑框架
源码

https://github.com/GreenGitHuber/code_something/tree/master/epoll%2Breactor%E6%A8%A1%E5%BC%8F%E7%9A%84%E6%9C%8D%E5%8A%A1%E5%99%A8

other

如果觉得有点难懂,可以先看下面的文章:
使用epoll开发服务器 :https://www.jianshu.com/p/0998e9ebfec3

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,366评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,521评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 165,689评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,925评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,942评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,727评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,447评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,349评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,820评论 1 317
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,990评论 3 337
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,127评论 1 351
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,812评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,471评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,017评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,142评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,388评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,066评论 2 355

推荐阅读更多精彩内容

  • @synthesize和@dynamic分别有什么作用?@property有两个对应的词,一个是 @synthes...
    笔笔请求阅读 515评论 0 1
  • 【Aipm引导页】 https://58976235.wodemo.net/down/20170514/44034...
    Mr_洛寒阅读 2,581评论 3 5
  • (开始) (标题)iApc(/标题)(链接)https://duming666.wodemo.net/down/2...
    独名阅读 1,541评论 1 3
  • 一切都过的好快,转眼间下周就是春节,一个阖家团圆的日子。不过对我来说阖家团圆什么的更像是个笑话,如同画饼充饥一样中...
    夏野阅读 148评论 0 0