前言
Reactor类实现了Pistache的线程模型,我在《Pistache 并发处理过程》一文中大概描述了其执行的流程,其实Pistache所用的方式,就是《Netty 系列之 Netty 线程模型》一文中提到的 “Reactor 多线程模型”。
我先分析一下这种方法的好处。
对于一个web服务器而言,是基于TCP + HTTP协议的。TCP与UDP不同之处在于,在使用TCP进行数据传送之前,必须要在客户端和服务端之间建立TCP连接,然后再进行数据传输,过程及涉及的系统调用如下图所示:
- accept()
如果任何客户端没有进行connect()操作,那么执行accept()将会阻塞 - read()
如果客户端没有通过write()发送数据(即HTTP请求,HTTP请求就是遵循了HTTP协议的消息),那么执行read()将会阻塞
我们先分析,传统的方法,即以Tomcat的BIO方式:
- 首先,由一个主线程来监听server-fd(listen-fd)
- 执行accept()阻塞等待新连接,当新连接到达时创建client-fd
- 从线程池中选择一个工作线程,讲client-fd交给线程处理,然后执行循环执行2、3
- 工作线程执行read(),阻塞等待HTTP请求的到达,然后解析、处理、封装HTTP请求,发送响应
- 如果是长连接(一般都是长连接),那么将循环执行4,知道断开连接,释放线程
这种模式最大缺点在于,每一个线程处理一个连接,这样就会严重的限制,同时处理的连接数,因为线程池是不能太大的。
因此epoll的IO复用机制对这个问题会很有帮助,基于epoll,每个线程可以处理多个连接:
- 同上
- 同上
- 同上
- 线程将client-fd添加到自己的维护的epoll中
- 当epoll监听的client-fd有先请求到达时,会返回对应的client-fd,然后线程来处理(解析、处理、封装HTTP请求,发送响应),处理完毕之后,继续使用epoll监听
这就是《Netty 系列之 Netty 线程模型》一文中提到的 “Reactor 多线程模型”,Tomcat的NIO模式略有不同,那就是在第5步,NIO会指派一个新的线程来执行HTTP操作((解析、处理、封装HTTP请求,发送响应)),从而具有更高的并发性(见tomcat8的并发参数控制)。
除此之外,《Netty 系列之 Netty 线程模型》还提到了单线程模型,即全程只需要一个线程搭配一个epoll,同时处理新连接和http请求。
Pistache在Reactor 多线程模型的基础之上,进行了一个改进,那就是,实现了异步写响应,见Pistache源码分析 —— 异步写机制。异步写响应的最大好处就是避免让线程阻塞在write()操作上,比如,如果当前网络比较拥挤,TCP的写入缓冲区满了,这时候执行write()会阻塞,影响处理其他连接。
而epoll的最大优势就是避免IO阻塞!!NIO的名字就是Non-Blocking IO。
然而要将并发提升到极致,仅仅依靠epoll机制实现非阻塞是不够的,必须要实现IO任务和CPU任务的并发!比如Nodejs的libuv使用线程池来处理file I/O,而这方面的王者就是Go语言。关于并发,后面会出一个详细的思考。这里就不再展开了。
Reactor类
Reactor类同时实现了单线程模型和多线程模型,在pistache的实现中,分别将其称之为同步模型和异步模型。无论是那种实现,每个线程的核心工作就是执行epoll的监听,当事件到达时,调用Transport 类的接口执行对应的处理函数。
一、epoll
每个线程都维护自己的epoll,而epoll监听的fd分为两类:
- client-fd这种动态创建的文件描述符
- eventfd这种为了实现PollableQueue静态创建的文件描述副
对于后者,只需要在为reactor对象添加transport时,注册到epoll中即可,这就是Transport 类中的:
void registerPoller(Polling::Epoll& poller) override;
见Pistache源码分析 —— Transport类中的第7小节。
而对于前者,则需要Reactor类实现对于epoll的管理,这样当新连接到达时,主线程将通过Transport 类调用Reactor类实现注册函数,将其添加到epoll中,即Transport 类中的:
void Transport::handlePeer(const std::shared_ptr<Peer>& peer)
void Transport::handlePeer(const std::shared_ptr<Peer>& peer)
{
int fd = peer->fd();
peers.insert(std::make_pair(fd, peer));
peer->associateTransport(this);
handler_->onConnection(peer);
reactor()->registerFd(key(), fd, NotifyOn::Read | NotifyOn::Shutdown,
Polling::Mode::Edge);
}
当然,真正执行handlePeer()的是工作线程,主线程只是讲peer对象,放到了transport的PeerQueue中,这里请参阅Pistache源码分析 —— Server的初始化和请求处理
相关成员函数
-
注册到epoll
void registerFd(const Key& key, Fd fd, Polling::NotifyOn interest, Polling::Tag tag, Polling::Mode mode = Polling::Mode::Level);
void registerFd(const Key& key, Fd fd, Polling::NotifyOn interest, Polling::Mode mode = Polling::Mode::Level);
-
注册到epoll(使用EPOLLNESHOT flags,见linux手册翻译——epoll_ctl(2))
void registerFdOneShot(const Key& key, Fd fd, Polling::NotifyOn interest, Polling::Tag tag, Polling::Mode mode = Polling::Mode::Level);
void registerFdOneShot(const Key& key, Fd fd, Polling::NotifyOn interest, Polling::Mode mode = Polling::Mode::Level);
-
修改关注的事件或者在EPOLLNESHOT后重新回复对fd的关注
void modifyFd(const Key& key, Fd fd, Polling::NotifyOn interest, Polling::Mode mode = Polling::Mode::Level);
void modifyFd(const Key& key, Fd fd, Polling::NotifyOn interest, Polling::Tag tag, Polling::Mode mode = Polling::Mode::Level);
-
从epoll中移除对fd的监控
void removeFd(const Key& key, Fd fd);
以后成员函数重载函数的区别在于是否提供tag的值,如果不提供,则默认是Polling::Tag(fd)
,当然这也不是最终的值,还需要融合key值,见下:
二、Handler(transport)
我们之前的系列文章中反复提到,在设计上,一个reactor对象可以拥有多种transport,或者说一个worker线程可以拥有多种handler。而在基于多线程的模型设计中,每个reactor可以拥有多个线程,因此每个线程都拥有每个不同的transport的clone对象。举个例子,比如有N个线程,M种transport,那么整个系统将会保留N×M个transport对象,换句话说,可以认为每个线程都维护了一个transport数组,且每一种transport在各个线程的transport数组都会有一份clone,且他们的索引值是相同的。
这里的每种transport,其实就是通过Transport 类或者其子类,通过构造函数创建的对象;每个对象再通过调用clone函数,为每一个线程分类一个克隆体,实现见下:
这就存在几个问题:
- 当新的连接到达,主线程需要将新连接对应的peer对象添加到reactor(或者说线程)的epoll中,这是通过transport提供的接口实现的,主线程确定如何改用线程中的那个transport呢?
- 当线程的epoll上的事件发生时,该调用那个transport处理事件?
解决方案就是Transport的基类中的 Reactor::Key key_
字段
问题 1
我们是在Listener类的bind()函数中,初始化了reactor并添加了transport:
auto transport = transportFactory_();
reactor_.init(Aio::AsyncContext(workers_, workersName_));
transportKey = reactor_.addHandler(transport);
auto transport = transportFactory_();
用于创建一个transport对象;
reactor_.init(Aio::AsyncContext(workers_, workersName_));
init是Reactor的初始化函数,将在下面展开;
transportKey = reactor_.addHandler(transport);
添加初始化函数,并返回一个key
值,保存到Listener类的transportKey
字段中;
而主线程正是通过transportKey,定位到是那一种transport。即在Listener类的dispatchPeer()函数中:
void Listener::dispatchPeer(const std::shared_ptr<Peer>& peer)
{
auto handlers = reactor_.handlers(transportKey);
auto idx = peer->fd() % handlers.size();
auto transport = std::static_pointer_cast<Transport>(handlers[idx]);
transport->handleNewPeer(peer);
}
reactor_.handlers(transportKey);
返回了transportKey对应的所有线程所拥有的clone的transport,这些transport都是同一“种”,对应的key值也完全相同。因此从这些transport中挑选一个来处理新连接就等价于挑选一个线程。
问题2:
我们可以看到上面Reactor类中操作epoll的成员函数的第一个参数都是const Key& key
,当注册epoll时,会将此key值写入到epoll的tag中(这里的tag是pistache对epoll的epoll_event.data
字段的封装,见linux手册翻译——epoll_ctl(2)).当收到事件后,通过解析tag就能找到对应的tansport
但是,前面说过,对于静态的fd,是由通过tansport.registerPoller
注册的:
void Transport::registerPoller(Polling::Epoll& poller)
{
writesQueue.bind(poller);
timersQueue.bind(poller);
peersQueue.bind(poller);
notifier.bind(poller);
}
这里并没有使用到key值,这里可以认为是个设计上的缺陷,因为从实现上,其实自始至终只有一个transport对象,其key值固定为0,即线程的transport数组的索引值,因此并不会导致reactor找错transport对象,我们将在实现部分详细说明。
相关成员函数
-
Key addHandler(const std::shared_ptr<Handler>& handler);
向reactor中添加transport对象,并返回transport的key值,或者说其索引值(0、1、2、3 ....) -
std::vector<std::shared_ptr<Handler>> handlers(const Key& key);
根据key值返回transport列表,对于单线程模型,仅会返回1个对于多线程模型,返回的handler个数同线程个数
三、实现
Reactor定义了一个实现的类:
class Impl;
以及该类的成员变量和gettter函数:
Impl* impl() const;
std::unique_ptr<Impl> impl_;
几乎所有的成员函数都是通过调用Impl类的接口实现,如:
Reactor::Key Reactor::addHandler(const std::shared_ptr<Handler>& handler)
{
return impl()->addHandler(handler, true);
}
Impl有两种实现,即前面提到的同步实现和异步实现,分别对应于单线程模型和多线程模型。
Reactor类的init函数用于初始化impl_以指定使用那种实现:
void Reactor::init()
{
SyncContext context;
init(context);
}
void Reactor::init(const ExecutionContext& context)
{
impl_.reset(context.makeImpl(this));
}
ExecutionContext类帮助我们生成特定的实现:
class ExecutionContext
{
public:
virtual ~ExecutionContext() = default;
virtual Reactor::Impl* makeImpl(Reactor* reactor) const = 0;
};
class SyncContext : public ExecutionContext
{
public:
~SyncContext() override = default;
Reactor::Impl* makeImpl(Reactor* reactor) const override;
};
class AsyncContext : public ExecutionContext
{
public:
explicit AsyncContext(size_t threads, const std::string& threadsName = "")
: threads_(threads)
, threadsName_(threadsName)
{ }
~AsyncContext() override = default;
Reactor::Impl* makeImpl(Reactor* reactor) const override;
static AsyncContext singleThreaded();
private:
size_t threads_;
std::string threadsName_;
};
我们从代码上可以得知,异步实现需要提供线程数和线程名两个参数。
此时回头看我们在 第二节 问题1 中截取的Listener类的bind()代码:
auto transport = transportFactory_();
reactor_.init(Aio::AsyncContext(workers_, workersName_));
transportKey = reactor_.addHandler(transport);
3.1 Reactor::Impl
这是个基类,可以理解为接口的概念,定义了实现Impl需要实现的功能,所有的功能都有Reactor相应的成员函数调用:
class Reactor::Impl
{
public:
Impl(Reactor* reactor)
: reactor_(reactor)
{ }
virtual ~Impl() = default;
virtual Reactor::Key addHandler(const std::shared_ptr<Handler>& handler,
bool setKey)
= 0;
[[nodiscard]] virtual std::vector<std::shared_ptr<Handler>>
handlers(const Reactor::Key& key) const = 0;
virtual void registerFd(const Reactor::Key& key, Fd fd,
Polling::NotifyOn interest, Polling::Tag tag,
Polling::Mode mode = Polling::Mode::Level)
= 0;
virtual void registerFdOneShot(const Reactor::Key& key, Fd fd,
Polling::NotifyOn interest, Polling::Tag tag,
Polling::Mode mode = Polling::Mode::Level)
= 0;
virtual void modifyFd(const Reactor::Key& key, Fd fd,
Polling::NotifyOn interest, Polling::Tag tag,
Polling::Mode mode = Polling::Mode::Level)
= 0;
virtual void removeFd(const Reactor::Key& key, Fd fd) = 0;
virtual void runOnce() = 0;
virtual void run() = 0;
virtual void shutdown() = 0;
Reactor* reactor_;
};
3.2 同步实现 class SyncImpl : public Reactor::Impl
成员变量
-
Polling::Epoll poller;
主心骨了 -
std::atomic<bool> shutdown_;
和NotifyFd shutdownFd;
用于终止服务 -
HandlerList handlers_;
用于存放transport对象,HandlerList可以理解为是一个Transport数组
成员函数
- 操作poller
void registerFd(const Reactor::Key& key, Fd fd, Polling::NotifyOn interest,Polling::Tag tag,Polling::Mode mode = Polling::Mode::Level) override
void registerFdOneShot(const Reactor::Key& key, Fd fd,Polling::NotifyOn interest, Polling::Tag tag,Polling::Mode mode = Polling::Mode::Level) override
void modifyFd(const Reactor::Key& key, Fd fd, Polling::NotifyOn interest,Polling::Tag tag,Polling::Mode mode = Polling::Mode::Level) override
void removeFd(const Reactor::Key& /*key*/, Fd fd) override
这些函数都是调用了pistache封装的epoll的相关操作,在调用之前都会将key值和tag的值编码后保存到epoll_event.data
中,编码的实现函数定义在HandlerList中:
// We are using the highest 8 bits of the fd to encode the index of the
// handler, which gives us a maximum of 2**8 - 1 handler, 255
static constexpr size_t HandlerBits = 8;
static constexpr size_t HandlerShift = sizeof(uint64_t) - HandlerBits;
static constexpr uint64_t DataMask = uint64_t(-1) >> HandlerBits;
static Polling::Tag encodeTag(const Reactor::Key& key, uint64_t value)
{
auto index = key.data();
// The reason why we are using the most significant bits to encode
// the index of the handler is that in the fast path, we won't need
// to shift the value to retrieve the fd if there is only one handler as
// all the bits will already be set to 0.
auto encodedValue = (index << HandlerShift) | value;
return Polling::Tag(encodedValue);
}
可以看到,编码的方式为高8位存放key,即transport的索引,低56位存放fd,这样就限制了handler的个数最多为255个。此外注意代码中的注释,因为高8位是index,而当仅有一个transport时,其索引值就是0,这样话就不需要解码了,将在后面再谈此问题。
-
操作handler
-
Reactor::Key addHandler(const std::shared_ptr<Handler>& handler, bool setKey = true) override;
- 执行transport的
registerPoller()
函数,将其需要监控的fd添加到poller
中 - 添加一个handler到
handlers_
中,这里调用了HandlerList的add()函数,函数将返回其索引值作为key值 - 返回key值
- 执行transport的
-
std::shared_ptr<Handler> handler(const Reactor::Key& key) const
根据key值,返回handler -
handlers(const Reactor::Key& key) const override
根据key值,返回handler的容器,因为是单线程模型,因此只能返回一个元素的容器
-
-
开始执行
-
void runOnce() override
开始执行epoll循环 - void run() override
将当前线程的tid赋值给transport的context_
字段,然后调用runOnce()
-
void handleFds(std::vector<Polling::Event> events) const
处理epoll事件
void handleFds(std::vector<Polling::Event> events) const
{
// Fast-path: if we only have one handler, do not bother scanning the fds to
// find the right handlers
if (handlers_.size() == 1)
handlers_.at(0)->onReady(FdSet(std::move(events)));
else
{
std::unordered_map<std::shared_ptr<Handler>, std::vector<Polling::Event>>
fdHandlers;
// 因为每个事件的tag是由index和fd组成
// 在这里我们根据index将所有的event分类,让handler处理自己的
for (auto& event : events)
{
size_t index;
uint64_t value;
std::tie(index, value) = decodeTag(event.tag);
auto handler_ = handlers_.at(index);
auto& evs = fdHandlers.at(handler_);
evs.push_back(std::move(event));
}
for (auto& data : fdHandlers)
{
data.first->onReady(FdSet(std::move(data.second)));
}
}
}
参数events
是触发的事件集合。
可以看到,如果当前仅有一个handler,那么他的索引值就是0,这样他的Polling::Tag的值就等于fd,因此不需要解码,就可以直接调用 handlers_.at(0)->onReady(FdSet(std::move(events)));
,这里从实现上解释了本文第二节中问题2中提出的问难
类关系图
感觉都不需要解释,我画的真的很清楚了。
3.3 异步实现 class AsyncImpl : public Reactor::Impl
AsyncImpl是基于SyncImpl实现的,他们的之间的关系图,如下:
异步实现,添加了Woker结构:
struct Worker
{
explicit Worker(Reactor* reactor, const std::string& threadsName)
: thread()
, sync(new SyncImpl(reactor))
, threadsName_(threadsName)
{ }
~Worker()
{
if (thread.joinable())
thread.join();
}
void run()
{
thread = std::thread([=]() {
if (!threadsName_.empty())
{
pthread_setname_np(pthread_self(),
threadsName_.substr(0, 15).c_str());
}
sync->run();
});
}
void shutdown() { sync->shutdown(); }
std::thread thread;
std::unique_ptr<SyncImpl> sync;
std::string threadsName_;
};
正如图所示,Worker中包含一个线程和一个Syml。因此Asyml提供的接口都是通过调用worker的sync的接口实现的,这里就引出了一个问题:
Asyml提供的接口都是由transport调用的,那么transport如果知道自己所在的worker呢?
图中结构已经给出了答案,就是将transport的key值,设计为 (Handler Index + Worker Index)
成员函数
-
编码解码
-
static Reactor::Key encodeKey(const Reactor::Key& originalKey, uint32_t value)
originalKey
就是我们在前面一直认为的transport的值,即handler index;而value
就是Worker Index。编码方式为:
* [ handler idx ] [ worker idx ] * ------------------------ ---------------------------- * ^ 32 bits ^ 32 bits * ----------------------------------------------------- * ^ 64 bits
-
static std::pair<uint32_t, uint32_t> decodeKey(const Reactor::Key& encodedKey)
解码的结果将返回一个std::pair<uint32_t, uint32_t>
-
-
操作poller
void registerFd(const Reactor::Key& key, Fd fd, Polling::NotifyOn interest, Polling::Tag tag, Polling::Mode mode = Polling::Mode::Level) override
void registerFdOneShot(const Reactor::Key& key, Fd fd, Polling::NotifyOn interest, Polling::Tag tag, Polling::Mode mode = Polling::Mode::Level) override
void modifyFd(const Reactor::Key& key, Fd fd, Polling::NotifyOn interest, Polling::Tag tag, Polling::Mode mode = Polling::Mode::Level) override
void removeFd(const Reactor::Key& key, Fd fd) override
在异步实现中,我们需要通过
const Reactor::Key& key
来确定要操作那个poller,以及将同步实现的返回的key值作为参数,调用相应的功能,以registerFd为例:void registerFd(const Reactor::Key& key, Fd fd, Polling::NotifyOn interest, Polling::Tag tag, Polling::Mode mode = Polling::Mode::Level) override { dispatchCall(key, &SyncImpl::registerFd, fd, interest, tag, mode); }
我们查看dispatchCall()函数的实现:
template <typename Func, typename... Args> void dispatchCall(const Reactor::Key& key, Func func, Args&&... args) const { auto decoded = decodeKey(key); const auto& wrk = workers_.at(decoded.second); Reactor::Key originalKey(decoded.first); (wrk->sync.get()->*(func)) (originalKey, std::forward<Args>(args)...); }
其实很简单,首先解码,获取worker Index和handler index,前者用于拿到worker,后者作为调用worker的sync的对应函数的参数,详细见同步实现。
操作handler
-
Reactor::Key addHandler(const std::shared_ptr<Handler>& handler, bool) override
Reactor::Key addHandler(const std::shared_ptr<Handler>& handler, bool) override { std::array<Reactor::Key, SyncImpl::MaxHandlers()> keys; for (size_t i = 0; i < workers_.size(); ++i) { auto& wrk = workers_.at(i); auto cl = handler->clone(); auto key = wrk->sync->addHandler(cl, false /* setKey */); auto newKey = encodeKey(key, static_cast<uint32_t>(i)); cl->key_ = newKey; keys.at(i) = key; } auto data = keys.at(0).data() << 32 | KeyMarker; return Reactor::Key(data); }
我们将worker Index 和通过
sync->addHandler(cl, false /* setKey */);
返回的handler Index,通过encodeKey()
函数编码得到key值,并将其复制给Transport::key_
但是注意最后addHandler()返回的key值是Transport::key_ << 32 | KeyMarker;
,见我画的示意图。 -
std::vector<std::shared_ptr<Handler>> handlers(const Reactor::Key& key) const override
std::vector<std::shared_ptr<Handler>> handlers(const Reactor::Key& key) const override { const std::pair<uint32_t, uint32_t> idx_marker = decodeKey(key); if (idx_marker.second != KeyMarker) throw std::runtime_error("Invalid key"); Reactor::Key originalKey(idx_marker.first); std::vector<std::shared_ptr<Handler>> res; res.reserve(workers_.size()); for (const auto& wrk : workers_) { res.push_back(wrk->sync->handler(originalKey)); } return res; }
- 开始执行
void run() override
{
for (auto& wrk : workers_)
wrk->run();
}
在最后分析一下构造函数吧:
AsyncImpl(Reactor* reactor, size_t threads, const std::string& threadsName)
: Reactor::Impl(reactor)
{
if (threads > SyncImpl::MaxHandlers())
throw std::runtime_error("Too many worker threads requested (max "s + std::to_string(SyncImpl::MaxHandlers()) + ")."s);
for (size_t i = 0; i < threads; ++i)
workers_.emplace_back(std::make_unique<Worker>(reactor, threadsName));
}
这里进行了一个判断:if (threads > SyncImpl::MaxHandlers())
,我们看一下SyncImpl::MaxHandlers()
:
static constexpr size_t HandlerBits = 8;
static constexpr size_t HandlerShift = sizeof(uint64_t) - HandlerBits;
static constexpr uint64_t DataMask = uint64_t(-1) >> HandlerBits;
static constexpr size_t MaxHandlers = (1 << HandlerBits) - 1;
static constexpr size_t MaxHandlers() { return HandlerList::MaxHandlers; }
这是在Sync中定义的,我们在前面说过,在传递给epoll的data数据,高8位是handler Index,而低56位是fd,因此在这里我们其实是限制了handler的数目,最多为255个。但是用这个数目来限制线程的数目显然是缺乏依据的!因为handler和线程数目压根就没有任何关系,当前设计的线程的理论上限应该是worker Index,即应该是2^32。