msg

Preview

本模块是ceph中各子系统相互通信及对外服务的模块,采用的是异步通信,ceph实现了POSIX、dpdk、rdma三种技术,本文只分析POSIX+epoll模型

前置知识

  1. 熟悉网络通信的基本编程知识,比如bind、listen、accept等,
  2. 熟悉epoll异步事件模型的编程知识

总体思路

和信号处理、日志模块差不多,都是先起几个用于处理网络通信的线程,将要监听的事件注册进去

源码分析

关键类

Messenger
AsyncMessenger
NetworkStack
PosixNetworkStack

启动

入口函数为Messenger::create,这是个类的静态函数,所以可以直接调用,从这个函数的实现可以看出只支持Async模式了

Messenger *Messenger::create(CephContext *cct, const string &type,
                 entity_name_t name, string lname,
                 uint64_t nonce, uint64_t cflags)
{
  int r = -1;
  if (type == "random") {
    r = 0;
    //r = ceph::util::generate_random_number(0, 1);
  }
  if (r == 0 || type.find("async") != std::string::npos)
    return new AsyncMessenger(cct, name, type, std::move(lname), nonce);
  lderr(cct) << "unrecognized ms_type '" << type << "'" << dendl;
  return nullptr;
}

继承关系:Messenger -> SimplePolicyMessenger -> AsyncMessenger

StackSingleton,主要用来实现stack的单例化

struct StackSingleton {
  CephContext *cct;
  std::shared_ptr<NetworkStack> stack;

  explicit StackSingleton(CephContext *c): cct(c) {}
  // stack没有初始化,就初始化它
  void ready(std::string &type) {
    if (!stack)
      stack = NetworkStack::create(cct, type);
  }
  ~StackSingleton() {
    stack->stop();
  }
};

AsyncMessenger的构造函数

AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name,
                               const std::string &type, string mname, uint64_t _nonce)
  : SimplePolicyMessenger(cct, name), // 初始化父类,没有啥需要特别强调的
    dispatch_queue(cct, this, mname),
    nonce(_nonce)
{
  std::string transport_type = "posix";
  ...
  // new a single NetworkStack object
  auto single = &cct->lookup_or_create_singleton_object<StackSingleton>(
    "AsyncMessenger::NetworkStack::" + transport_type, true, cct); // 这里是获取StackSingleton的全局单例对象
  // 初始化stack
  single->ready(transport_type);
  stack = single->stack.get();
  stack->start();
  local_worker = stack->get_worker();
  local_connection = ceph::make_ref<AsyncConnection>(cct, this, &dispatch_queue,
                     local_worker, true, true);
  init_local_connection();
  reap_handler = new C_handle_reap(this);
  unsigned processor_num = 1;
  if (stack->support_local_listen_table())
    processor_num = stack->get_num_worker();
  for (unsigned i = 0; i < processor_num; ++i)
    processors.push_back(new Processor(this, stack->get_worker(i), cct));
}

stack实例化 NetworkStack::create,静态类函数,和上面套路是一样的,在这里创建PosixNetworkStack的实例
继承关系:NetworkStack -> PosixNetworkStack

std::shared_ptr<NetworkStack> NetworkStack::create(CephContext *c, const string &t)
{
  if (t == "posix")
    return std::make_shared<PosixNetworkStack>(c, t);
    ...
}

PosixNetworkStack的构造函数

PosixNetworkStack::PosixNetworkStack(CephContext *c, const string &t)
    : NetworkStack(c, t) // 初始化父类
{
}

NetworkStack的构造函数,主要的逻辑是集中在这里的

NetworkStack::NetworkStack(CephContext *c, const string &t): type(t), started(false), cct(c)
{
  ceph_assert(cct->_conf->ms_async_op_threads > 0);

  const int InitEventNumber = 5000;
  // limit max threads num
  num_workers = cct->_conf->ms_async_op_threads;  //num_workers是指通讯线程的数量 
  if (num_workers >= EventCenter::MAX_EVENTCENTER) {
    num_workers = EventCenter::MAX_EVENTCENTER; // 即使支持更多的线程也不需要
  }
  
  // 创建num_workers个Worker,一个worker对应这一个通信线程,也就是说Worker是线程的包装类
  for (unsigned worker_id = 0; worker_id < num_workers; ++worker_id) {
    Worker *w = create_worker(cct, type, worker_id);
    int ret = w->center.init(InitEventNumber, worker_id, type);
    if (ret)
      throw std::system_error(-ret, std::generic_category());
    将Worker加入到workers中,workers就是保存已有Worker的
    workers.push_back(w);
  }
}

NetworkStack::create_worker,在这里面实例化PosixWorker
继承关系:Worker -> PosixWorker

Worker* NetworkStack::create_worker(CephContext *c, const string &type, unsigned worker_id)
{
  if (type == "posix")
    return new PosixWorker(c, worker_id);
  ....
}

PosixWorker的构造函数

PosixWorker(CephContext *c, unsigned i)
      : Worker(c, i) /*初始化父类*/, net(c) {}

Worker 的构造函数,唯一重要的是这里初始化了EventCenter类型的成员center,这个是事件的总控

Worker(CephContext *c, unsigned worker_id)
    : cct(c), perf_logger(NULL), id(worker_id), references(0), center(c) {
    ...
  }

回到NetworkStack::NetworkStack中,这时进行下一步

int ret = w->center.init(InitEventNumber, worker_id, type);

EventCenter::init

int EventCenter::init(int nevent, unsigned center_id, const std::string &type)
{
  // can't init multi times
  ceph_assert(this->nevent == 0);

  this->type = type;
  this->center_id = center_id;

  ...
#ifdef HAVE_EPOLL
  driver = new EpollDriver(cct); // 初始化EpollDriver,这个EpollDriver主要提供epoll相关功能,下面详解
  ...
  }
  ...

  int r = driver->init(this, nevent);
  ...

  file_events.resize(nevent);
  this->nevent = nevent;
  ...
  // 下面的操作就是创建一个管道
  // 这里的epoll会阻塞式的监听直到超时或者有事件到来,如果我们需要注册新事件这种操作就需要唤醒它,唤醒的方式就是增加一个
  // 管道监听,当我们需要唤醒的时候就往管道里写数据
  int fds[2];
  if (pipe_cloexec(fds, 0) < 0) {
    ...
  }

  notify_receive_fd = fds[0];
  notify_send_fd = fds[1];
  r = net.set_nonblock(notify_receive_fd);
  ...
  r = net.set_nonblock(notify_send_fd);
  ...

  return r;
}

EpollDriver的构造函数没啥重要的

EpollDriver::init,这里面会进行创建epoll和设置epoll描述符属性的操作

int EpollDriver::init(EventCenter *c, int nevent)
{
  // 分配内存,大小为nevent
  events = (struct epoll_event*)calloc(nevent, sizeof(struct epoll_event));
  // epoll是不用指明事件数的
  epfd = epoll_create(1024); /* 1024 is just an hint for the kernel */
  ...
  if (::fcntl(epfd, F_SETFD, FD_CLOEXEC) == -1) {
    ...
  }

  this->nevent = nevent;

  return 0;
}

回到AsyncMessenger::AsyncMessenger中,进行下一步

stack = single->stack.get();
stack->start();

NetworkStack::start 这一步就是就是将线程运行起来

void NetworkStack::start()
{
    ...
  for (unsigned i = 0; i < num_workers; ++i) {
    if (workers[i]->is_init())
      continue;
    // 获取通信线程要运行的函数
    std::function<void ()> thread = add_thread(i);
    // 将拿到的函数注入到通信线程中
    spawn_worker(i, std::move(thread));
  }
  started = true;
  lk.unlock();
  // 等待线程完成对Worker的初始化
  for (unsigned i = 0; i < num_workers; ++i)
    workers[i]->wait_for_init();
}

// 返回一个匿名函数,用于通信线程运行
std::function<void ()> NetworkStack::add_thread(unsigned worker_id)
{
  // 获取对应的Worker
  Worker *w = workers[worker_id];
  return [this, w]() {
      char tp_name[16];
      // 字符串拼接
      sprintf(tp_name, "msgr-worker-%u", w->id);
      // 设置线程的名字
      ceph_pthread_setname(pthread_self(), tp_name);
      // epoll等待的超时时间
      const unsigned EventMaxWaitUs = 30000000;
      // 这一步设置owner是用来以后判断当前执行的线程是不是该Worker对应线程的
      w->center.set_owner();
      // 空函数
      w->initialize();
      // Worker完成初始化
      w->init_done();
      // 一直循环,直到需要停止(即w->done为真)
      while (!w->done) {
        ...
        ceph::timespan dur;
        int r = w->center.process_events(EventMaxWaitUs, &dur);
        ...
      w->reset();
      w->destroy();
  };
}

void spawn_worker(unsigned i, std::function<void ()> &&func) override {
    threads.resize(i+1);
    threads[i] = std::thread(func);
  }

重点来了哈,w->center.process_events,在每个线程中会一直循环调用这个方法,用以监听、处理事件

引用知乎大佬的话:

接下来是while循环,来执行EventCenter::process_events。这里事件有三类

  • time_events:定时事件,比如connection的定时函数AsyncConnection::tick。
  • external_events:外部事件,比如要发送消息时,send_message就是外部事件。
  • 可读事件:epoll监听的事件,比如socket连接的对端发来的消息。

EventCenter::process_events的逻辑比较明显,分为两部分:超时监听,回调事件注册函数。

超时一般设置为30秒,但是并不固定,与external_events和time_events有关。如果有external_events,则超时时间为0,即不阻塞等待,先去处理external_events;如果有time_events,根据定时的时间和30秒来确定超时时间。超时时间确定后,就开始等待。

// 所有的事件都会包含一个回调函数,处理事件其实就是去执行这个函数
int EventCenter::process_events(unsigned timeout_microseconds,  ceph::timespan *working_dur)
{
  // 自行查询详细用法,相当于个计时器,从创建这个对象开始计时
  struct timeval tv;
  // 记录处理的事件数
  int numevents;
  bool trigger_time = false;
  auto now = clock_type::now();
  clock_type::time_point end_time = now + std::chrono::microseconds(timeout_microseconds);

  // 当有定时任务时,根据定时时间来确定超时时间
  auto it = time_events.begin();
  if (it != time_events.end() && end_time >= it->first) {
    trigger_time = true;
    end_time = it->first;

    if (end_time > now) {
      timeout_microseconds = std::chrono::duration_cast<std::chrono::microseconds>(end_time - now).count();
    } else {
      timeout_microseconds = 0;
    }
  }
  // 当有外部事件时,epoll为非阻塞式等待
  bool blocking = pollers.empty() && !external_num_events.load();
  if (!blocking)
    timeout_microseconds = 0;
  tv.tv_sec = timeout_microseconds / 1000000;
  tv.tv_usec = timeout_microseconds % 1000000;

  ldout(cct, 30) << __func__ << " wait second " << tv.tv_sec << " usec " << tv.tv_usec << dendl;
  // 这个是用来记录发生事件的fd和mask的
  vector<FiredFileEvent> fired_events;
  // 这个函数比较简单,就是epoll wait,判断时间类型,不过多解释
  numevents = driver->event_wait(fired_events, &tv);
  auto working_start = ceph::mono_clock::now();
  // 循环处理事件
  for (int event_id = 0; event_id < numevents; event_id++) {
    int rfired = 0;
    FileEvent *event;
    EventCallbackRef cb;
    event = _get_file_event(fired_events[event_id].fd);

    /* note the event->mask & mask & ... code: maybe an already processed
    * event removed an element that fired and we still didn't
    * processed, so we check if the event is still valid. */
    // 处理可读事件
    if (event->mask & fired_events[event_id].mask & EVENT_READABLE) {
      rfired = 1;
      cb = event->read_cb;
      // 回调函数
      cb->do_request(fired_events[event_id].fd);
    }
    // 处理可写事件
    if (event->mask & fired_events[event_id].mask & EVENT_WRITABLE) {
      if (!rfired || event->read_cb != event->write_cb) {
        cb = event->write_cb;
        cb->do_request(fired_events[event_id].fd);
      }
    }

    ldout(cct, 30) << __func__ << " event_wq process is " << fired_events[event_id].fd
                   << " mask is " << fired_events[event_id].mask << dendl;
  }

  // 处理定时任务
  if (trigger_time)
    numevents += process_time_events();
  // 处理外部事件
  if (external_num_events.load()) {
    external_lock.lock();
    deque<EventCallbackRef> cur_process;
    cur_process.swap(external_events);
    external_num_events.store(0);
    external_lock.unlock();
    numevents += cur_process.size();
    while (!cur_process.empty()) {
      EventCallbackRef e = cur_process.front();
      ldout(cct, 30) << __func__ << " do " << e << dendl;
      e->do_request(0);
      cur_process.pop_front();
    }
  }

  if (!numevents && !blocking) {
    for (uint32_t i = 0; i < pollers.size(); i++)
      numevents += pollers[i]->poll();
  }
  // 记录处理耗时
  if (working_dur)
    *working_dur = ceph::mono_clock::now() - working_start;
  return numevents;
}

回到AsyncMessenger::AsyncMessenger中,进行下一步

// 找个负载最低的线程做本地链接线程
local_worker = stack->get_worker();
local_connection = ceph::make_ref<AsyncConnection>(cct, this, &dispatch_queue,
                     local_worker, true, true);
init_local_connection();

AsyncConnection的构造函数,这里就没啥好强调的了

AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQueue *q,
                                 Worker *w, bool m2, bool local)
  : Connection(cct, m), // 初始化父类
    delay_state(NULL), async_msgr(m), conn_id(q->get_id()),
    logger(w->get_perf_counter()),
    state(STATE_NONE), port(-1),
    dispatch_queue(q), recv_buf(NULL),
    recv_max_prefetch(std::max<int64_t>(msgr->cct->_conf->ms_tcp_prefetch_max_size, TCP_PREFETCH_MIN_SIZE)),
    recv_start(0), recv_end(0),
    last_active(ceph::coarse_mono_clock::now()),
    connect_timeout_us(cct->_conf->ms_connection_ready_timeout*1000*1000),
    inactive_timeout_us(cct->_conf->ms_connection_idle_timeout*1000*1000),
    msgr2(m2), state_offset(0),
    worker(w), center(&w->center),read_buffer(nullptr)
{
#ifdef UNIT_TESTS_BUILT
  this->interceptor = m->interceptor;
#endif
  read_handler = new C_handle_read(this);
  write_handler = new C_handle_write(this);
  write_callback_handler = new C_handle_write_callback(this);
  wakeup_handler = new C_time_wakeup(this);
  tick_handler = new C_tick_wakeup(this);
  // double recv_max_prefetch see "read_until"
  recv_buf = new char[2*recv_max_prefetch];
  if (local) {
    protocol = std::unique_ptr<Protocol>(new LoopbackProtocolV1(this));
  } else if (m2) {
    protocol = std::unique_ptr<Protocol>(new ProtocolV2(this));
  } else {
    protocol = std::unique_ptr<Protocol>(new ProtocolV1(this));
  }
  logger->inc(l_msgr_created_connections);
}

init_local_connection

void init_local_connection() {
    std::lock_guard l{lock};
    local_connection->is_loopback = true;
    _init_local_connection();
  }

_init_local_connection

void _init_local_connection() {
    ceph_assert(ceph_mutex_is_locked(lock));
    local_connection->peer_addrs = *my_addrs;
    local_connection->peer_type = my_name.type();
    local_connection->set_features(CEPH_FEATURES_ALL);
    // 感觉这一步没啥用呀,还没有运行add_dispatcher_tail呢,以后看懂了再填坑~
    ms_deliver_handle_fast_connect(local_connection.get());
}

bind+listen

就是正常的启动一个网络服务器的步骤

入口函数:AsyncMessenger:: bindv

int AsyncMessenger:: bindv(const entity_addrvec_t &bind_addrs)
{
  ...
  // bind to a socket
  set<int> avoid_ports;
  entity_addrvec_t bound_addrs;
  unsigned i = 0;
  for (auto &&p : processors) {
    // 开始bind,在Processors中进行
    int r = p->bind(bind_addrs, avoid_ports, &bound_addrs);
    if (r) {
      // Note: this is related to local tcp listen table problem.
      // Posix(default kernel implementation) backend shares listen table
      // in the kernel, so all threads can use the same listen table naturally
      // and only one thread need to bind. But other backends(like dpdk) uses local
      // listen table, we need to bind/listen tcp port for each worker. So if the
      // first worker failed to bind, it could be think the normal error then handle
      // it, like port is used case. But if the first worker successfully to bind
      // but the second worker failed, it's not expected and we need to assert
      // here
      // 大概的意思是如果是起了多个线程,这些线程是共享同一个listen表的,也就是你在一个线程中bind了地址开始listen,
      // 那么你可以在其他线程中进行accept
      ceph_assert(i == 0);
      return r;
    }
    ++i;
  }
  _finish_bind(bind_addrs, bound_addrs);
  return 0;
}

Processor::bind

int Processor::bind(const entity_addrvec_t &bind_addrs,
            const set<int>& avoid_ports,
            entity_addrvec_t* bound_addrs)
{
  const auto& conf = msgr->cct->_conf;

  SocketOptions opts;
  // 这两项ceph中有对应的配置,大概的意思就是处理这些请求是否异步
  opts.nodelay = msgr->cct->_conf->ms_tcp_nodelay;  // 是否异步
  opts.rcbuf_size = msgr->cct->_conf->ms_tcp_rcvbuf;  // 缓存这些异步请求的buffer大小
  // 一个模块会起多个端口监听,比如mds就会起两个端口监听请求(一个对外一个对内,osd好像是8个)
  listen_sockets.resize(bind_addrs.v.size());
  *bound_addrs = bind_addrs;

  for (unsigned k = 0; k < bind_addrs.v.size(); ++k) {
    auto& listen_addr = bound_addrs->v[k];

    /* bind to port */
    int r = -1;
    // 重试机制
    for (int i = 0; i < conf->ms_bind_retry_count; i++) {
      ...
      // 获取一个可用的端口
      if (listen_addr.get_port()) {
        // 核心就是调用这个submit_to,submit_to会去判断是否是当前线程是否就是该Processor中center对应的线程,
        // 如果不是就将这个bind的作为一个外部事件注册,这个匿名函数就是回调函数,并唤醒对应的线程,如果是就执行这个匿名函数
        // 如果always_async为真,那不管是不是当前线程都以事件的形式执行
    worker->center.submit_to(
      worker->center.get_id(),
      [this, k, &listen_addr, &opts, &r]() {
        r = worker->listen(listen_addr, k, opts, &listen_sockets[k]);
      }, false);
    ...
    }
    ...
  }
  return 0;
}

注册外部事件用EventCenter::dispatch_event_external函数,注意:这时已经是调用Processor对应的Worker了

void EventCenter::dispatch_event_external(EventCallbackRef e)
{
  uint64_t num = 0;
  {
    // 把事件加入external_events
    std::lock_guard lock{external_lock};
    if (external_num_events > 0 && *external_events.rbegin() == e) {
      return;
    }
    external_events.push_back(e);
    num = ++external_num_events;
  }
  if (num == 1 && !in_thread())
    // 唤醒线程
    wakeup();

  ldout(cct, 30) << __func__ << " " << e << " pending " << num << dendl;
}

不管是异步还是非异步都是执行哪个匿名函数

this, k, &listen_addr, &opts, &r]() {
        r = worker->listen(listen_addr, k, opts, &listen_sockets[k]);
      }

也就是调用对应Worker的listen函数,这里就简单了,就是普通的网络编程操作

int PosixWorker::listen(entity_addr_t &sa,
            unsigned addr_slot,
            const SocketOptions &opt,
                        ServerSocket *sock)
{
  // 获取socket的fd
  int listen_sd = net.create_socket(sa.get_family(), true);
  ...
  // 设置非阻塞
  int r = net.set_nonblock(listen_sd);
  ...

  r = net.set_socket_options(listen_sd, opt.nodelay, opt.rcbuf_size);
  ...
  // 将socket和地址绑定
  r = ::bind(listen_sd, sa.get_sockaddr(), sa.get_sockaddr_len());
  ...
  // 开始listen
  r = ::listen(listen_sd, cct->_conf->ms_tcp_listen_backlog);
  ...
  // 记录所有的监听socket
  *sock = ServerSocket(
          std::unique_ptr<PosixServerSocketImpl>(
        new PosixServerSocketImpl(net, listen_sd, sa, addr_slot)));
  return 0;
}

为listen的fd创建对应的事件

以mds为例,在MDSDaemon::init()中会调用下面两个

messenger->add_dispatcher_tail(&beacon);
messenger->add_dispatcher_tail(this);
void add_dispatcher_tail(Dispatcher *d)
  {
    bool first = dispatchers.empty();
    dispatchers.push_back(d);
    if (d->ms_can_fast_dispatch_any())
      fast_dispatchers.push_back(d);
    // 如果是第一个加入dispatchers的就执行
    if (first)
      ready();
  }

这里是调用的AsyncMessenger::ready()

void AsyncMessenger::ready()
{
  ...
  // POSIX中这个ready为空函数
  stack->ready();
  // 如果还没有bind,就先去bind
  if (pending_bind) {
    int err = bindv(pending_bind_addrs);
    ...
  }

  std::lock_guard l{lock};
  // 每个Processor都执行start函数,在POSIX模式下,Processor只有一个
  for (auto &&p : processors)
    p->start();
  dispatch_queue.start();
}

和上面bind时一样,切换到Processor对应的worker中,执行这个匿名函数

void Processor::start()
{
  ldout(msgr->cct, 1) << __func__ << dendl;

  // start thread
  worker->center.submit_to(worker->center.get_id(), [this]() {
      // 还记得我们bind的完成之后是吧listen_fd包装之后保存到了listen_sockets中吧
      for (auto& listen_socket : listen_sockets) {
    if (listen_socket) {
          if (listen_socket.fd() == -1) {
            ldout(msgr->cct, 1) << __func__ 
                << " Error: processor restart after listen_socket.fd closed. " 
                << this << dendl;
            return;
          }
    // 主要是执行这个函数,这里是为我们bind的fd创建对应的事件
      worker->center.create_file_event(listen_socket.fd(), EVENT_READABLE,
                       listen_handler); }
      }
    }, false);
}

这里说明下这个listen_handler最终是调用Processor::accept(),详细可以看看代码实现很简单
EventCenter::create_file_event

int EventCenter::create_file_event(int fd, int mask, EventCallbackRef ctxt) {
  ceph_assert(in_thread());
  int r = 0;
  if (fd >= nevent) {
    // 以2的倍数增加,是常见的处理方法
    int new_size = nevent << 2;
    while (fd >= new_size)
      new_size <<= 2;

    ldout(cct, 20) << __func__ << " event count exceed " << nevent << ", expand to " << new_size << dendl;
    // epoll是不需要这一步的
    r = driver->resize_events(new_size);
    if (r < 0) {
      lderr(cct) << __func__ << " event count is exceed." << dendl;
      return -ERANGE;
    }
    file_events.resize(new_size);
    nevent = new_size;
  }
  // 获取对应索引元素的指针
  EventCenter::FileEvent *event = _get_file_event(fd);
  ...
  // 相等就说明是已经创建过了
  if (event->mask == mask)
    return 0;

  // 这里就是将事件加入到epoll监听中,没啥好强调的
  r = driver->add_event(fd, event->mask, mask);
  ...

  // 将回调函数加入事件中,在事件发生的时候好调用
  event->mask |= mask;
  if (mask & EVENT_READABLE) {
    event->read_cb = ctxt;
  }
  if (mask & EVENT_WRITABLE) {
    event->write_cb = ctxt;
  }
  ldout(cct, 20) << __func__ << " create event end fd=" << fd << " mask=" << mask
                 << " original mask is " << event->mask << dendl;
  return 0;
}

Processor::accept()

void Processor::accept() {
  SocketOptions opts;
  opts.nodelay = msgr->cct->_conf->ms_tcp_nodelay;
  opts.rcbuf_size = msgr->cct->_conf->ms_tcp_rcvbuf;
  opts.priority = msgr->get_socket_priority();

  for (auto &listen_socket: listen_sockets) {
    ldout(msgr->cct, 10) << __func__ << " listen_fd=" << listen_socket.fd()
                         << dendl;
    unsigned accept_error_num = 0;

    while (true) {
      entity_addr_t addr;
      ConnectedSocket cli_socket;
      Worker *w = worker;
      // 如果不支持share listen table,则选一个低负载的Worker
      if (!msgr->get_stack()->support_local_listen_table())
        w = msgr->get_stack()->get_worker();
      else
        ++w->references;
      int r = listen_socket.accept(&cli_socket, opts, &addr, w);
      if (r == 0) {
        ...

        msgr->add_accept(
                w, std::move(cli_socket),
                msgr->get_myaddrs().v[listen_socket.get_addr_slot()],
                addr);
        accept_error_num = 0;
        continue;
      } else {
        // 错误处理
      }
    }
  }
}

// 都是正常的设置操作

int PosixServerSocketImpl::accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out, Worker *w) {
  ceph_assert(sock);
  sockaddr_storage ss;
  socklen_t slen = sizeof(ss);
  int sd = accept_cloexec(_fd, (sockaddr*)&ss, &slen); // 非阻塞式
  ...
  int r = handler.set_nonblock(sd);
  ...
  r = handler.set_socket_options(sd, opt.nodelay, opt.rcbuf_size);
  ...
  out->set_type(addr_type);
  out->set_sockaddr((sockaddr*)&ss);
  handler.set_priority(sd, opt.priority, out->get_family());

  std::unique_ptr<PosixConnectedSocketImpl> csi(new PosixConnectedSocketImpl(handler, *out, sd, true));
  *sock = ConnectedSocket(std::move(csi));
  return 0;
}
void AsyncMessenger::add_accept(Worker *w, ConnectedSocket cli_socket,
                                const entity_addr_t &listen_addr,
                                const entity_addr_t &peer_addr) {
  std::lock_guard l{lock};
  auto conn = ceph::make_ref<AsyncConnection>(cct, this, &dispatch_queue, w,
                                              listen_addr.is_msgr2(), false);
  // 主要进行两个操作,1.更改AsyncConnection的状态,2.再次将事件加入监听
  conn->accept(std::move(cli_socket), listen_addr, peer_addr);
  // 用来记录当前连接
  accepting_conns.insert(conn);
}

后面就是基于AsyncConnection做各种信息验证,验证完成后就可以读写数据了

©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容