Preview
本模块是ceph中各子系统相互通信及对外服务的模块,采用的是异步通信,ceph实现了POSIX、dpdk、rdma三种技术,本文只分析POSIX+epoll模型
前置知识
- 熟悉网络通信的基本编程知识,比如bind、listen、accept等,
- 熟悉epoll异步事件模型的编程知识
总体思路
和信号处理、日志模块差不多,都是先起几个用于处理网络通信的线程,将要监听的事件注册进去
源码分析
关键类
Messenger
AsyncMessenger
NetworkStack
PosixNetworkStack
启动
入口函数为Messenger::create,这是个类的静态函数,所以可以直接调用,从这个函数的实现可以看出只支持Async模式了
Messenger *Messenger::create(CephContext *cct, const string &type,
entity_name_t name, string lname,
uint64_t nonce, uint64_t cflags)
{
int r = -1;
if (type == "random") {
r = 0;
//r = ceph::util::generate_random_number(0, 1);
}
if (r == 0 || type.find("async") != std::string::npos)
return new AsyncMessenger(cct, name, type, std::move(lname), nonce);
lderr(cct) << "unrecognized ms_type '" << type << "'" << dendl;
return nullptr;
}
继承关系:Messenger -> SimplePolicyMessenger -> AsyncMessenger
StackSingleton,主要用来实现stack的单例化
struct StackSingleton {
CephContext *cct;
std::shared_ptr<NetworkStack> stack;
explicit StackSingleton(CephContext *c): cct(c) {}
// stack没有初始化,就初始化它
void ready(std::string &type) {
if (!stack)
stack = NetworkStack::create(cct, type);
}
~StackSingleton() {
stack->stop();
}
};
AsyncMessenger的构造函数
AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name,
const std::string &type, string mname, uint64_t _nonce)
: SimplePolicyMessenger(cct, name), // 初始化父类,没有啥需要特别强调的
dispatch_queue(cct, this, mname),
nonce(_nonce)
{
std::string transport_type = "posix";
...
// new a single NetworkStack object
auto single = &cct->lookup_or_create_singleton_object<StackSingleton>(
"AsyncMessenger::NetworkStack::" + transport_type, true, cct); // 这里是获取StackSingleton的全局单例对象
// 初始化stack
single->ready(transport_type);
stack = single->stack.get();
stack->start();
local_worker = stack->get_worker();
local_connection = ceph::make_ref<AsyncConnection>(cct, this, &dispatch_queue,
local_worker, true, true);
init_local_connection();
reap_handler = new C_handle_reap(this);
unsigned processor_num = 1;
if (stack->support_local_listen_table())
processor_num = stack->get_num_worker();
for (unsigned i = 0; i < processor_num; ++i)
processors.push_back(new Processor(this, stack->get_worker(i), cct));
}
stack实例化 NetworkStack::create,静态类函数,和上面套路是一样的,在这里创建PosixNetworkStack的实例
继承关系:NetworkStack -> PosixNetworkStack
std::shared_ptr<NetworkStack> NetworkStack::create(CephContext *c, const string &t)
{
if (t == "posix")
return std::make_shared<PosixNetworkStack>(c, t);
...
}
PosixNetworkStack的构造函数
PosixNetworkStack::PosixNetworkStack(CephContext *c, const string &t)
: NetworkStack(c, t) // 初始化父类
{
}
NetworkStack的构造函数,主要的逻辑是集中在这里的
NetworkStack::NetworkStack(CephContext *c, const string &t): type(t), started(false), cct(c)
{
ceph_assert(cct->_conf->ms_async_op_threads > 0);
const int InitEventNumber = 5000;
// limit max threads num
num_workers = cct->_conf->ms_async_op_threads; //num_workers是指通讯线程的数量
if (num_workers >= EventCenter::MAX_EVENTCENTER) {
num_workers = EventCenter::MAX_EVENTCENTER; // 即使支持更多的线程也不需要
}
// 创建num_workers个Worker,一个worker对应这一个通信线程,也就是说Worker是线程的包装类
for (unsigned worker_id = 0; worker_id < num_workers; ++worker_id) {
Worker *w = create_worker(cct, type, worker_id);
int ret = w->center.init(InitEventNumber, worker_id, type);
if (ret)
throw std::system_error(-ret, std::generic_category());
将Worker加入到workers中,workers就是保存已有Worker的
workers.push_back(w);
}
}
NetworkStack::create_worker,在这里面实例化PosixWorker
继承关系:Worker -> PosixWorker
Worker* NetworkStack::create_worker(CephContext *c, const string &type, unsigned worker_id)
{
if (type == "posix")
return new PosixWorker(c, worker_id);
....
}
PosixWorker的构造函数
PosixWorker(CephContext *c, unsigned i)
: Worker(c, i) /*初始化父类*/, net(c) {}
Worker 的构造函数,唯一重要的是这里初始化了EventCenter类型的成员center,这个是事件的总控
Worker(CephContext *c, unsigned worker_id)
: cct(c), perf_logger(NULL), id(worker_id), references(0), center(c) {
...
}
回到NetworkStack::NetworkStack中,这时进行下一步
int ret = w->center.init(InitEventNumber, worker_id, type);
EventCenter::init
int EventCenter::init(int nevent, unsigned center_id, const std::string &type)
{
// can't init multi times
ceph_assert(this->nevent == 0);
this->type = type;
this->center_id = center_id;
...
#ifdef HAVE_EPOLL
driver = new EpollDriver(cct); // 初始化EpollDriver,这个EpollDriver主要提供epoll相关功能,下面详解
...
}
...
int r = driver->init(this, nevent);
...
file_events.resize(nevent);
this->nevent = nevent;
...
// 下面的操作就是创建一个管道
// 这里的epoll会阻塞式的监听直到超时或者有事件到来,如果我们需要注册新事件这种操作就需要唤醒它,唤醒的方式就是增加一个
// 管道监听,当我们需要唤醒的时候就往管道里写数据
int fds[2];
if (pipe_cloexec(fds, 0) < 0) {
...
}
notify_receive_fd = fds[0];
notify_send_fd = fds[1];
r = net.set_nonblock(notify_receive_fd);
...
r = net.set_nonblock(notify_send_fd);
...
return r;
}
EpollDriver的构造函数没啥重要的
EpollDriver::init,这里面会进行创建epoll和设置epoll描述符属性的操作
int EpollDriver::init(EventCenter *c, int nevent)
{
// 分配内存,大小为nevent
events = (struct epoll_event*)calloc(nevent, sizeof(struct epoll_event));
// epoll是不用指明事件数的
epfd = epoll_create(1024); /* 1024 is just an hint for the kernel */
...
if (::fcntl(epfd, F_SETFD, FD_CLOEXEC) == -1) {
...
}
this->nevent = nevent;
return 0;
}
回到AsyncMessenger::AsyncMessenger中,进行下一步
stack = single->stack.get();
stack->start();
NetworkStack::start 这一步就是就是将线程运行起来
void NetworkStack::start()
{
...
for (unsigned i = 0; i < num_workers; ++i) {
if (workers[i]->is_init())
continue;
// 获取通信线程要运行的函数
std::function<void ()> thread = add_thread(i);
// 将拿到的函数注入到通信线程中
spawn_worker(i, std::move(thread));
}
started = true;
lk.unlock();
// 等待线程完成对Worker的初始化
for (unsigned i = 0; i < num_workers; ++i)
workers[i]->wait_for_init();
}
// 返回一个匿名函数,用于通信线程运行
std::function<void ()> NetworkStack::add_thread(unsigned worker_id)
{
// 获取对应的Worker
Worker *w = workers[worker_id];
return [this, w]() {
char tp_name[16];
// 字符串拼接
sprintf(tp_name, "msgr-worker-%u", w->id);
// 设置线程的名字
ceph_pthread_setname(pthread_self(), tp_name);
// epoll等待的超时时间
const unsigned EventMaxWaitUs = 30000000;
// 这一步设置owner是用来以后判断当前执行的线程是不是该Worker对应线程的
w->center.set_owner();
// 空函数
w->initialize();
// Worker完成初始化
w->init_done();
// 一直循环,直到需要停止(即w->done为真)
while (!w->done) {
...
ceph::timespan dur;
int r = w->center.process_events(EventMaxWaitUs, &dur);
...
w->reset();
w->destroy();
};
}
void spawn_worker(unsigned i, std::function<void ()> &&func) override {
threads.resize(i+1);
threads[i] = std::thread(func);
}
重点来了哈,w->center.process_events,在每个线程中会一直循环调用这个方法,用以监听、处理事件
引用知乎大佬的话:
接下来是while循环,来执行EventCenter::process_events。这里事件有三类
- time_events:定时事件,比如connection的定时函数AsyncConnection::tick。
- external_events:外部事件,比如要发送消息时,send_message就是外部事件。
- 可读事件:epoll监听的事件,比如socket连接的对端发来的消息。
EventCenter::process_events的逻辑比较明显,分为两部分:超时监听,回调事件注册函数。
超时一般设置为30秒,但是并不固定,与external_events和time_events有关。如果有external_events,则超时时间为0,即不阻塞等待,先去处理external_events;如果有time_events,根据定时的时间和30秒来确定超时时间。超时时间确定后,就开始等待。
// 所有的事件都会包含一个回调函数,处理事件其实就是去执行这个函数
int EventCenter::process_events(unsigned timeout_microseconds, ceph::timespan *working_dur)
{
// 自行查询详细用法,相当于个计时器,从创建这个对象开始计时
struct timeval tv;
// 记录处理的事件数
int numevents;
bool trigger_time = false;
auto now = clock_type::now();
clock_type::time_point end_time = now + std::chrono::microseconds(timeout_microseconds);
// 当有定时任务时,根据定时时间来确定超时时间
auto it = time_events.begin();
if (it != time_events.end() && end_time >= it->first) {
trigger_time = true;
end_time = it->first;
if (end_time > now) {
timeout_microseconds = std::chrono::duration_cast<std::chrono::microseconds>(end_time - now).count();
} else {
timeout_microseconds = 0;
}
}
// 当有外部事件时,epoll为非阻塞式等待
bool blocking = pollers.empty() && !external_num_events.load();
if (!blocking)
timeout_microseconds = 0;
tv.tv_sec = timeout_microseconds / 1000000;
tv.tv_usec = timeout_microseconds % 1000000;
ldout(cct, 30) << __func__ << " wait second " << tv.tv_sec << " usec " << tv.tv_usec << dendl;
// 这个是用来记录发生事件的fd和mask的
vector<FiredFileEvent> fired_events;
// 这个函数比较简单,就是epoll wait,判断时间类型,不过多解释
numevents = driver->event_wait(fired_events, &tv);
auto working_start = ceph::mono_clock::now();
// 循环处理事件
for (int event_id = 0; event_id < numevents; event_id++) {
int rfired = 0;
FileEvent *event;
EventCallbackRef cb;
event = _get_file_event(fired_events[event_id].fd);
/* note the event->mask & mask & ... code: maybe an already processed
* event removed an element that fired and we still didn't
* processed, so we check if the event is still valid. */
// 处理可读事件
if (event->mask & fired_events[event_id].mask & EVENT_READABLE) {
rfired = 1;
cb = event->read_cb;
// 回调函数
cb->do_request(fired_events[event_id].fd);
}
// 处理可写事件
if (event->mask & fired_events[event_id].mask & EVENT_WRITABLE) {
if (!rfired || event->read_cb != event->write_cb) {
cb = event->write_cb;
cb->do_request(fired_events[event_id].fd);
}
}
ldout(cct, 30) << __func__ << " event_wq process is " << fired_events[event_id].fd
<< " mask is " << fired_events[event_id].mask << dendl;
}
// 处理定时任务
if (trigger_time)
numevents += process_time_events();
// 处理外部事件
if (external_num_events.load()) {
external_lock.lock();
deque<EventCallbackRef> cur_process;
cur_process.swap(external_events);
external_num_events.store(0);
external_lock.unlock();
numevents += cur_process.size();
while (!cur_process.empty()) {
EventCallbackRef e = cur_process.front();
ldout(cct, 30) << __func__ << " do " << e << dendl;
e->do_request(0);
cur_process.pop_front();
}
}
if (!numevents && !blocking) {
for (uint32_t i = 0; i < pollers.size(); i++)
numevents += pollers[i]->poll();
}
// 记录处理耗时
if (working_dur)
*working_dur = ceph::mono_clock::now() - working_start;
return numevents;
}
回到AsyncMessenger::AsyncMessenger中,进行下一步
// 找个负载最低的线程做本地链接线程
local_worker = stack->get_worker();
local_connection = ceph::make_ref<AsyncConnection>(cct, this, &dispatch_queue,
local_worker, true, true);
init_local_connection();
AsyncConnection的构造函数,这里就没啥好强调的了
AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQueue *q,
Worker *w, bool m2, bool local)
: Connection(cct, m), // 初始化父类
delay_state(NULL), async_msgr(m), conn_id(q->get_id()),
logger(w->get_perf_counter()),
state(STATE_NONE), port(-1),
dispatch_queue(q), recv_buf(NULL),
recv_max_prefetch(std::max<int64_t>(msgr->cct->_conf->ms_tcp_prefetch_max_size, TCP_PREFETCH_MIN_SIZE)),
recv_start(0), recv_end(0),
last_active(ceph::coarse_mono_clock::now()),
connect_timeout_us(cct->_conf->ms_connection_ready_timeout*1000*1000),
inactive_timeout_us(cct->_conf->ms_connection_idle_timeout*1000*1000),
msgr2(m2), state_offset(0),
worker(w), center(&w->center),read_buffer(nullptr)
{
#ifdef UNIT_TESTS_BUILT
this->interceptor = m->interceptor;
#endif
read_handler = new C_handle_read(this);
write_handler = new C_handle_write(this);
write_callback_handler = new C_handle_write_callback(this);
wakeup_handler = new C_time_wakeup(this);
tick_handler = new C_tick_wakeup(this);
// double recv_max_prefetch see "read_until"
recv_buf = new char[2*recv_max_prefetch];
if (local) {
protocol = std::unique_ptr<Protocol>(new LoopbackProtocolV1(this));
} else if (m2) {
protocol = std::unique_ptr<Protocol>(new ProtocolV2(this));
} else {
protocol = std::unique_ptr<Protocol>(new ProtocolV1(this));
}
logger->inc(l_msgr_created_connections);
}
init_local_connection
void init_local_connection() {
std::lock_guard l{lock};
local_connection->is_loopback = true;
_init_local_connection();
}
_init_local_connection
void _init_local_connection() {
ceph_assert(ceph_mutex_is_locked(lock));
local_connection->peer_addrs = *my_addrs;
local_connection->peer_type = my_name.type();
local_connection->set_features(CEPH_FEATURES_ALL);
// 感觉这一步没啥用呀,还没有运行add_dispatcher_tail呢,以后看懂了再填坑~
ms_deliver_handle_fast_connect(local_connection.get());
}
bind+listen
就是正常的启动一个网络服务器的步骤
入口函数:AsyncMessenger:: bindv
int AsyncMessenger:: bindv(const entity_addrvec_t &bind_addrs)
{
...
// bind to a socket
set<int> avoid_ports;
entity_addrvec_t bound_addrs;
unsigned i = 0;
for (auto &&p : processors) {
// 开始bind,在Processors中进行
int r = p->bind(bind_addrs, avoid_ports, &bound_addrs);
if (r) {
// Note: this is related to local tcp listen table problem.
// Posix(default kernel implementation) backend shares listen table
// in the kernel, so all threads can use the same listen table naturally
// and only one thread need to bind. But other backends(like dpdk) uses local
// listen table, we need to bind/listen tcp port for each worker. So if the
// first worker failed to bind, it could be think the normal error then handle
// it, like port is used case. But if the first worker successfully to bind
// but the second worker failed, it's not expected and we need to assert
// here
// 大概的意思是如果是起了多个线程,这些线程是共享同一个listen表的,也就是你在一个线程中bind了地址开始listen,
// 那么你可以在其他线程中进行accept
ceph_assert(i == 0);
return r;
}
++i;
}
_finish_bind(bind_addrs, bound_addrs);
return 0;
}
Processor::bind
int Processor::bind(const entity_addrvec_t &bind_addrs,
const set<int>& avoid_ports,
entity_addrvec_t* bound_addrs)
{
const auto& conf = msgr->cct->_conf;
SocketOptions opts;
// 这两项ceph中有对应的配置,大概的意思就是处理这些请求是否异步
opts.nodelay = msgr->cct->_conf->ms_tcp_nodelay; // 是否异步
opts.rcbuf_size = msgr->cct->_conf->ms_tcp_rcvbuf; // 缓存这些异步请求的buffer大小
// 一个模块会起多个端口监听,比如mds就会起两个端口监听请求(一个对外一个对内,osd好像是8个)
listen_sockets.resize(bind_addrs.v.size());
*bound_addrs = bind_addrs;
for (unsigned k = 0; k < bind_addrs.v.size(); ++k) {
auto& listen_addr = bound_addrs->v[k];
/* bind to port */
int r = -1;
// 重试机制
for (int i = 0; i < conf->ms_bind_retry_count; i++) {
...
// 获取一个可用的端口
if (listen_addr.get_port()) {
// 核心就是调用这个submit_to,submit_to会去判断是否是当前线程是否就是该Processor中center对应的线程,
// 如果不是就将这个bind的作为一个外部事件注册,这个匿名函数就是回调函数,并唤醒对应的线程,如果是就执行这个匿名函数
// 如果always_async为真,那不管是不是当前线程都以事件的形式执行
worker->center.submit_to(
worker->center.get_id(),
[this, k, &listen_addr, &opts, &r]() {
r = worker->listen(listen_addr, k, opts, &listen_sockets[k]);
}, false);
...
}
...
}
return 0;
}
注册外部事件用EventCenter::dispatch_event_external函数,注意:这时已经是调用Processor对应的Worker了
void EventCenter::dispatch_event_external(EventCallbackRef e)
{
uint64_t num = 0;
{
// 把事件加入external_events
std::lock_guard lock{external_lock};
if (external_num_events > 0 && *external_events.rbegin() == e) {
return;
}
external_events.push_back(e);
num = ++external_num_events;
}
if (num == 1 && !in_thread())
// 唤醒线程
wakeup();
ldout(cct, 30) << __func__ << " " << e << " pending " << num << dendl;
}
不管是异步还是非异步都是执行哪个匿名函数
this, k, &listen_addr, &opts, &r]() {
r = worker->listen(listen_addr, k, opts, &listen_sockets[k]);
}
也就是调用对应Worker的listen函数,这里就简单了,就是普通的网络编程操作
int PosixWorker::listen(entity_addr_t &sa,
unsigned addr_slot,
const SocketOptions &opt,
ServerSocket *sock)
{
// 获取socket的fd
int listen_sd = net.create_socket(sa.get_family(), true);
...
// 设置非阻塞
int r = net.set_nonblock(listen_sd);
...
r = net.set_socket_options(listen_sd, opt.nodelay, opt.rcbuf_size);
...
// 将socket和地址绑定
r = ::bind(listen_sd, sa.get_sockaddr(), sa.get_sockaddr_len());
...
// 开始listen
r = ::listen(listen_sd, cct->_conf->ms_tcp_listen_backlog);
...
// 记录所有的监听socket
*sock = ServerSocket(
std::unique_ptr<PosixServerSocketImpl>(
new PosixServerSocketImpl(net, listen_sd, sa, addr_slot)));
return 0;
}
为listen的fd创建对应的事件
以mds为例,在MDSDaemon::init()中会调用下面两个
messenger->add_dispatcher_tail(&beacon);
messenger->add_dispatcher_tail(this);
void add_dispatcher_tail(Dispatcher *d)
{
bool first = dispatchers.empty();
dispatchers.push_back(d);
if (d->ms_can_fast_dispatch_any())
fast_dispatchers.push_back(d);
// 如果是第一个加入dispatchers的就执行
if (first)
ready();
}
这里是调用的AsyncMessenger::ready()
void AsyncMessenger::ready()
{
...
// POSIX中这个ready为空函数
stack->ready();
// 如果还没有bind,就先去bind
if (pending_bind) {
int err = bindv(pending_bind_addrs);
...
}
std::lock_guard l{lock};
// 每个Processor都执行start函数,在POSIX模式下,Processor只有一个
for (auto &&p : processors)
p->start();
dispatch_queue.start();
}
和上面bind时一样,切换到Processor对应的worker中,执行这个匿名函数
void Processor::start()
{
ldout(msgr->cct, 1) << __func__ << dendl;
// start thread
worker->center.submit_to(worker->center.get_id(), [this]() {
// 还记得我们bind的完成之后是吧listen_fd包装之后保存到了listen_sockets中吧
for (auto& listen_socket : listen_sockets) {
if (listen_socket) {
if (listen_socket.fd() == -1) {
ldout(msgr->cct, 1) << __func__
<< " Error: processor restart after listen_socket.fd closed. "
<< this << dendl;
return;
}
// 主要是执行这个函数,这里是为我们bind的fd创建对应的事件
worker->center.create_file_event(listen_socket.fd(), EVENT_READABLE,
listen_handler); }
}
}, false);
}
这里说明下这个listen_handler最终是调用Processor::accept(),详细可以看看代码实现很简单
EventCenter::create_file_event
int EventCenter::create_file_event(int fd, int mask, EventCallbackRef ctxt) {
ceph_assert(in_thread());
int r = 0;
if (fd >= nevent) {
// 以2的倍数增加,是常见的处理方法
int new_size = nevent << 2;
while (fd >= new_size)
new_size <<= 2;
ldout(cct, 20) << __func__ << " event count exceed " << nevent << ", expand to " << new_size << dendl;
// epoll是不需要这一步的
r = driver->resize_events(new_size);
if (r < 0) {
lderr(cct) << __func__ << " event count is exceed." << dendl;
return -ERANGE;
}
file_events.resize(new_size);
nevent = new_size;
}
// 获取对应索引元素的指针
EventCenter::FileEvent *event = _get_file_event(fd);
...
// 相等就说明是已经创建过了
if (event->mask == mask)
return 0;
// 这里就是将事件加入到epoll监听中,没啥好强调的
r = driver->add_event(fd, event->mask, mask);
...
// 将回调函数加入事件中,在事件发生的时候好调用
event->mask |= mask;
if (mask & EVENT_READABLE) {
event->read_cb = ctxt;
}
if (mask & EVENT_WRITABLE) {
event->write_cb = ctxt;
}
ldout(cct, 20) << __func__ << " create event end fd=" << fd << " mask=" << mask
<< " original mask is " << event->mask << dendl;
return 0;
}
Processor::accept()
void Processor::accept() {
SocketOptions opts;
opts.nodelay = msgr->cct->_conf->ms_tcp_nodelay;
opts.rcbuf_size = msgr->cct->_conf->ms_tcp_rcvbuf;
opts.priority = msgr->get_socket_priority();
for (auto &listen_socket: listen_sockets) {
ldout(msgr->cct, 10) << __func__ << " listen_fd=" << listen_socket.fd()
<< dendl;
unsigned accept_error_num = 0;
while (true) {
entity_addr_t addr;
ConnectedSocket cli_socket;
Worker *w = worker;
// 如果不支持share listen table,则选一个低负载的Worker
if (!msgr->get_stack()->support_local_listen_table())
w = msgr->get_stack()->get_worker();
else
++w->references;
int r = listen_socket.accept(&cli_socket, opts, &addr, w);
if (r == 0) {
...
msgr->add_accept(
w, std::move(cli_socket),
msgr->get_myaddrs().v[listen_socket.get_addr_slot()],
addr);
accept_error_num = 0;
continue;
} else {
// 错误处理
}
}
}
}
// 都是正常的设置操作
int PosixServerSocketImpl::accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out, Worker *w) {
ceph_assert(sock);
sockaddr_storage ss;
socklen_t slen = sizeof(ss);
int sd = accept_cloexec(_fd, (sockaddr*)&ss, &slen); // 非阻塞式
...
int r = handler.set_nonblock(sd);
...
r = handler.set_socket_options(sd, opt.nodelay, opt.rcbuf_size);
...
out->set_type(addr_type);
out->set_sockaddr((sockaddr*)&ss);
handler.set_priority(sd, opt.priority, out->get_family());
std::unique_ptr<PosixConnectedSocketImpl> csi(new PosixConnectedSocketImpl(handler, *out, sd, true));
*sock = ConnectedSocket(std::move(csi));
return 0;
}
void AsyncMessenger::add_accept(Worker *w, ConnectedSocket cli_socket,
const entity_addr_t &listen_addr,
const entity_addr_t &peer_addr) {
std::lock_guard l{lock};
auto conn = ceph::make_ref<AsyncConnection>(cct, this, &dispatch_queue, w,
listen_addr.is_msgr2(), false);
// 主要进行两个操作,1.更改AsyncConnection的状态,2.再次将事件加入监听
conn->accept(std::move(cli_socket), listen_addr, peer_addr);
// 用来记录当前连接
accepting_conns.insert(conn);
}
后面就是基于AsyncConnection做各种信息验证,验证完成后就可以读写数据了