EventLoop类实现
事件循环类 主要包含了 Channel和Poller两个模块
EventLoop.h
#pragma once
#include "noncopyable.h"
#include "Timestamp.h"
#include "CurrentThread.h"
#include <functional>
#include <vector>
#include <atomic>
#include <unistd.h>
#include <memory>
#include <mutex>
class Channel;
class Poller;
// 事件循环类 主要包含了两个大模块 Channel Poller(epoll的抽象)
class EventLoop : noncopyable {
public:
using Functor = std::function<void()>;
EventLoop();
~EventLoop();
// 开启事件循环
void loop();
//退出事件循环
void quit();
Timestamp pollReturnTime() const {return pollReturnTime_;}
// 在当前loop中执行cb
void runInLoop(Functor cb);
//吧cb放入队列中,唤醒loop所在的线程,执行cb
void queueInLoop(Functor cb);
//唤醒loop所在的线程
void wakeup();
//EventLoop的方法 => Poller的方法
void updateChannel(Channel* channel);
void removeChannel(Channel* channel);
bool hasChannel(Channel* channel);
//判断EventLoop对象是否在自己的线程里面
bool isInLoopThread() const { return threadId_ == CurrentThread::tid();}
private:
void handleRead(); //wake up
void doPendingFunctors(); //执行回调
using ChannelList = std::vector<Channel*>;
std::atomic_bool looping_; //原子操作,通过CAS实现
std::atomic_bool quit_; // 标识退出loop循环
const pid_t threadId_; // 记录当前loop所在的id
Timestamp pollReturnTime_; //poller返回发生事件的channels的时间点
std::unique_ptr<Poller> poller_;
int wakeupFd_; // 当mainLoop获取一个新用户的channel,通过轮询算法选择一个subloop,通过该成员唤醒subloop处理channel
std::unique_ptr<Channel> wakeupChannel_;
ChannelList activeChannels_;
std::atomic_bool callingPendingFunctors_; //表示当前loop是否有需要执行的回调操作
std::vector<Functor> pendingFunctors_; //存储loop需要执行的所有的回调操作
std::mutex mutex_; //互斥锁,用来保护上面vector容器的线程安全操作
};
EventLoop.cc
#include "EventLoop.h"
#include "Logger.h"
#include "Poller.h"
#include "Channel.h"
#include <sys/eventfd.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#include <memory>
// 防止一个线程创建多个EventLoop
__thread EventLoop* t_loopInThisThread = nullptr;
// 定义默认的Poller IO复用接口的超时时间
const int kPollTimeMs = 10000;
//创建wakeupfd,用来notify唤醒subReactor处理新来的channel
int createEventfd() {
int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
if(evtfd < 0) {
LOG_FATAL("eventfd error:%d \n", errno);
}
return evtfd;
}
EventLoop::EventLoop()
: looping_(false)
, quit_(false)
, callingPendingFunctors_(false)
, threadId_(CurrentThread::tid())
, poller_(Poller::newDefaultPoller(this))
, wakeupFd_(createEventfd())
, wakeupChannel_(new Channel(this, wakeupFd_)) {
LOG_DEBUG("EventLoop created %p in thread %d \n", this, threadId_);
if(t_loopInThisThread) {
LOG_FATAL("Another EventLoop %p exists in this thread %d \n", t_loopInThisThread, threadId_);
} else {
t_loopInThisThread = this;
}
// 设置wakeupfd的事件类型及发生事件后的回调操作
wakeupChannel_->setReadCallback(std::bind(&EventLoop::handleRead, this));
// 每一个EventLoop都将监听wakeupchannel的EPOLLIN读事件了
wakeupChannel_->enableReading();
}
EventLoop::~EventLoop(){
wakeupChannel_->disableAll();
wakeupChannel_->remove();
::close(wakeupFd_);
t_loopInThisThread = nullptr;
}
void EventLoop::loop() {
looping_ = true;
quit_ = false;
LOG_INFO("EventLoop %p start looping \n", this);
while(!quit_) {
activeChannels_.clear();
// 监听两类fd 一种是client的fd,一种是wakeupfd(mainloop唤醒subloop)
pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
for(Channel* channel : activeChannels_) {
// Poller监听哪些channel发生事件了,然后上报给EventLoop,通知channel处理相应的事件
channel->handleEvent(pollReturnTime_);
}
// 执行当前EventLoop事件循环需要处理的回调操作
/*
IO线程 mainLoop accept fd => channel subloop
mainLoop 是先注册一个回调cb(需要subloop来执行) wakeup subloop后,执行下面的方法,就是之前mainLoop注册的cb操作
*/
doPendingFunctors();
}
LOG_INFO("EventLoop %p stop looping \n", this);
looping_ = false;
}
// 退出事件循环 1.loop在自己的线程中调用quit 2.在非loop的线程中,调用loop的quit
void EventLoop::quit(){
quit_ = true;
//如果是在其他线程中调用的quit 在一个subloop(worker)中,调用了mainloop(IO)的quit
if(!isInLoopThread()) {
wakeup();
}
}
// 在当前loop中执行cb
void EventLoop::runInLoop(Functor cb){
if(isInLoopThread()) { // 在当前的loop线程中,执行cb
cb();
}else { // 在非当前loop线程中执行cb,那就需要唤醒loop所在线程执行cb
queueInLoop(cb);
}
}
//把cb放入队列中,唤醒loop所在的线程,执行cb
void EventLoop::queueInLoop(Functor cb){
{
std::unique_lock<std::mutex> lock(mutex_);
pendingFunctors_.emplace_back(cb);
}
// 唤醒相应的,需要执行上面回调操作的loop线程了
/*
callingPendingFunctors_为true代表当前loop正在执行回调,但loop又有了新的回调
*/
if (!isInLoopThread() || callingPendingFunctors_) {
wakeup(); // 唤醒loop所在线程
}
}
//唤醒loop所在的线程 向wakeupfd_写一个数据,wakeupChannel就会发生读事件,当前loop线程就会被唤醒
void EventLoop::wakeup(){
uint64_t one = 1;
ssize_t n = write(wakeupFd_, &one, sizeof one);
if(n != sizeof one) {
LOG_ERROR("EventLoop::wakeup() writes %lu bytes instead of 8 \n", n);
}
}
//EventLoop的方法 => Poller的方法
void EventLoop::updateChannel(Channel* channel){
poller_->updateChannel(channel);
}
void EventLoop::removeChannel(Channel* channel){
poller_->removeChannel(channel);
}
bool EventLoop::hasChannel(Channel* channel){
return poller_->hasChannel(channel);
}
//执行回调
void EventLoop::doPendingFunctors(){
std::vector<Functor> functors;
callingPendingFunctors_ = true;
{
std::unique_lock<std::mutex> lock(mutex_);
functors.swap(pendingFunctors_);
}
for(const Functor &functor : functors) {
functor(); // 执行当前loop需要执行的回调操作
}
callingPendingFunctors_ = false;
}
void EventLoop::handleRead() {
uint64_t one = 1;
ssize_t n = read(wakeupFd_, &one, sizeof one);
if(n != sizeof one) {
LOG_ERROR("EventLoop::handleRead() reads %d bytes instead of 8", n);
}
}
关于上一篇文章的newDefaultPoller实现
#include "Poller.h"
#include "EPollPoller.h"
#include <stdlib.h>
Poller* Poller::newDefaultPoller(EventLoop* loop) {
if(::getenv("MUDUO_USE_POLL")) {
return nullptr; //生成poll的实例
} else {
return new EPollPoller(loop); // 生成epoll的实例
}
}