2023-11-14 muduo库学习之仿写代码7 EventLoop类实现

EventLoop类实现

事件循环类 主要包含了 Channel和Poller两个模块

EventLoop.h

#pragma once

#include "noncopyable.h"
#include "Timestamp.h"
#include "CurrentThread.h"

#include <functional>
#include <vector>
#include <atomic>
#include <unistd.h>
#include <memory>
#include <mutex>


class Channel;
class Poller;

// 事件循环类 主要包含了两个大模块 Channel Poller(epoll的抽象)
class EventLoop : noncopyable {
public:
    using Functor = std::function<void()>;
    EventLoop();
    ~EventLoop();

    // 开启事件循环
    void loop();
    //退出事件循环
    void quit();

    Timestamp pollReturnTime() const {return pollReturnTime_;}

    // 在当前loop中执行cb
    void runInLoop(Functor cb);
    //吧cb放入队列中,唤醒loop所在的线程,执行cb
    void queueInLoop(Functor cb);

    //唤醒loop所在的线程
    void wakeup();

    //EventLoop的方法 => Poller的方法
    void updateChannel(Channel* channel);
    void removeChannel(Channel* channel);
    bool hasChannel(Channel* channel);

    //判断EventLoop对象是否在自己的线程里面
    bool isInLoopThread() const { return threadId_ == CurrentThread::tid();}
private:

    void handleRead(); //wake up
    void doPendingFunctors(); //执行回调


    using ChannelList = std::vector<Channel*>;

    std::atomic_bool looping_; //原子操作,通过CAS实现
    std::atomic_bool quit_; // 标识退出loop循环
    
    const pid_t threadId_; // 记录当前loop所在的id
    Timestamp pollReturnTime_; //poller返回发生事件的channels的时间点
    std::unique_ptr<Poller> poller_;

    int wakeupFd_; // 当mainLoop获取一个新用户的channel,通过轮询算法选择一个subloop,通过该成员唤醒subloop处理channel
    std::unique_ptr<Channel> wakeupChannel_;

    ChannelList activeChannels_;

    std::atomic_bool callingPendingFunctors_; //表示当前loop是否有需要执行的回调操作
    std::vector<Functor> pendingFunctors_; //存储loop需要执行的所有的回调操作
    std::mutex mutex_; //互斥锁,用来保护上面vector容器的线程安全操作
};

EventLoop.cc

#include "EventLoop.h"
#include "Logger.h"
#include "Poller.h"
#include "Channel.h"

#include <sys/eventfd.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#include <memory>

// 防止一个线程创建多个EventLoop
__thread EventLoop* t_loopInThisThread = nullptr;

// 定义默认的Poller IO复用接口的超时时间
const int kPollTimeMs = 10000;

//创建wakeupfd,用来notify唤醒subReactor处理新来的channel
int createEventfd() {
    int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
    if(evtfd < 0) {
        LOG_FATAL("eventfd error:%d \n", errno);
    }
    return evtfd;
}

EventLoop::EventLoop() 
    : looping_(false)
    , quit_(false)
    , callingPendingFunctors_(false)
    , threadId_(CurrentThread::tid())
    , poller_(Poller::newDefaultPoller(this))
    , wakeupFd_(createEventfd())
    , wakeupChannel_(new Channel(this, wakeupFd_)) {
        LOG_DEBUG("EventLoop created %p in thread %d \n", this, threadId_);
        if(t_loopInThisThread) {
            LOG_FATAL("Another EventLoop %p exists in this thread %d \n", t_loopInThisThread, threadId_);
        } else {
            t_loopInThisThread = this;
        }

        // 设置wakeupfd的事件类型及发生事件后的回调操作
        wakeupChannel_->setReadCallback(std::bind(&EventLoop::handleRead, this));
        // 每一个EventLoop都将监听wakeupchannel的EPOLLIN读事件了
        wakeupChannel_->enableReading();
}

EventLoop::~EventLoop(){
    wakeupChannel_->disableAll();
    wakeupChannel_->remove();
    ::close(wakeupFd_);
    t_loopInThisThread = nullptr;
}

void EventLoop::loop() {
    looping_ = true;
    quit_ = false;

    LOG_INFO("EventLoop %p start looping \n", this);

    while(!quit_) {
        activeChannels_.clear();
        // 监听两类fd 一种是client的fd,一种是wakeupfd(mainloop唤醒subloop)
        pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
        for(Channel* channel : activeChannels_) {
            // Poller监听哪些channel发生事件了,然后上报给EventLoop,通知channel处理相应的事件
            channel->handleEvent(pollReturnTime_);
        }
        // 执行当前EventLoop事件循环需要处理的回调操作
        /*
        IO线程 mainLoop accept fd => channel subloop
        mainLoop 是先注册一个回调cb(需要subloop来执行) wakeup subloop后,执行下面的方法,就是之前mainLoop注册的cb操作
        */
        doPendingFunctors();
    }

    LOG_INFO("EventLoop %p stop looping \n", this);
    looping_ = false;

}

// 退出事件循环 1.loop在自己的线程中调用quit 2.在非loop的线程中,调用loop的quit
void EventLoop::quit(){
    quit_ = true;

    //如果是在其他线程中调用的quit 在一个subloop(worker)中,调用了mainloop(IO)的quit
    if(!isInLoopThread()) {
        wakeup();
    }
}

// 在当前loop中执行cb
void EventLoop::runInLoop(Functor cb){
    if(isInLoopThread()) { // 在当前的loop线程中,执行cb
        cb();
    }else { // 在非当前loop线程中执行cb,那就需要唤醒loop所在线程执行cb
        queueInLoop(cb);
    }
}
//把cb放入队列中,唤醒loop所在的线程,执行cb
void EventLoop::queueInLoop(Functor cb){
    {
        std::unique_lock<std::mutex> lock(mutex_);
        pendingFunctors_.emplace_back(cb);
    }

    // 唤醒相应的,需要执行上面回调操作的loop线程了
    /*
    callingPendingFunctors_为true代表当前loop正在执行回调,但loop又有了新的回调
    */
    if (!isInLoopThread() || callingPendingFunctors_) {
        wakeup(); // 唤醒loop所在线程
    }
}

//唤醒loop所在的线程 向wakeupfd_写一个数据,wakeupChannel就会发生读事件,当前loop线程就会被唤醒
void EventLoop::wakeup(){
    uint64_t one = 1;
    ssize_t n = write(wakeupFd_, &one, sizeof one);
    if(n != sizeof one) {
        LOG_ERROR("EventLoop::wakeup() writes %lu bytes instead of 8 \n", n);
    }
}

//EventLoop的方法 => Poller的方法
void EventLoop::updateChannel(Channel* channel){
    poller_->updateChannel(channel);
}

void EventLoop::removeChannel(Channel* channel){
    poller_->removeChannel(channel);
}

bool EventLoop::hasChannel(Channel* channel){
    return poller_->hasChannel(channel);
}

//执行回调
void EventLoop::doPendingFunctors(){
    std::vector<Functor> functors;
    callingPendingFunctors_ = true;

    {
        std::unique_lock<std::mutex> lock(mutex_);
        functors.swap(pendingFunctors_);
    }

    for(const Functor &functor : functors) {
        functor(); // 执行当前loop需要执行的回调操作
    }

    callingPendingFunctors_ = false;
} 





void EventLoop::handleRead() {
    uint64_t one = 1;
    ssize_t n = read(wakeupFd_, &one, sizeof one);

    if(n != sizeof one) {
        LOG_ERROR("EventLoop::handleRead() reads %d bytes instead of 8", n);
    }
}

关于上一篇文章的newDefaultPoller实现

#include "Poller.h"
#include "EPollPoller.h"

#include <stdlib.h>

Poller* Poller::newDefaultPoller(EventLoop* loop) {
    if(::getenv("MUDUO_USE_POLL")) {
        return nullptr; //生成poll的实例
    } else {
        return new EPollPoller(loop); // 生成epoll的实例
    }
}
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容