Envoy源码分析之二--Envoy启动与新连接建立

代码版本:stable/v1.7.1
上篇文章我们分析过Envoy启动的第一步,进行六个模块的初始化操作,其中比较重要的一块是server的初始化。本片文章将继续走读server对象实例被初始化之后的启动流程以及新连接的建立。

1. 入口

入口函数还是在main.cc中的int main中,如下

/**
 * Basic Site-Specific main()
 *
 * This should be used to do setup tasks specific to a particular site's
 * deployment such as initializing signal handling. It calls main_common
 * after setting up command line options.
 */
int main(int argc, char** argv) {
  std::unique_ptr<Envoy::MainCommon> main_common;

  // Initialize the server's main context under a try/catch loop and simply return EXIT_FAILURE
  // as needed. Whatever code in the initialization path that fails is expected to log an error
  // message so the user can diagnose.
  try {
    main_common = std::make_unique<Envoy::MainCommon>(argc, argv);
  } catch (const Envoy::NoServingException& e) {
    return EXIT_SUCCESS;
  } catch (const Envoy::MalformedArgvException& e) {
    return EXIT_FAILURE;
  } catch (const Envoy::EnvoyException& e) {
    return EXIT_FAILURE;
  }

  // Run the server listener loop outside try/catch blocks, so that unexpected exceptions
  // show up as a core-dumps for easier diagnostis.
  return main_common->run() ? EXIT_SUCCESS : EXIT_FAILURE;
}

其中main_common是Envoy::MainCommon类型实例,在其class定义中,run函数定义如下

lass MainCommon {
public:
  MainCommon(int argc, const char* const* argv);
  bool run() { return base_.run(); }

  static std::string hotRestartVersion(uint64_t max_num_stats, uint64_t max_stat_name_len,
                                       bool hot_restart_enabled);

private:
#ifdef ENVOY_HANDLE_SIGNALS
  Envoy::SignalAction handle_sigs;
  Envoy::TerminateHandler log_on_terminate;
#endif

  Envoy::OptionsImpl options_;
  MainCommonBase base_;
};

base_类型是是MainCommonBase类型的实例,在main_common.cc中找到run的具体实现

bool MainCommonBase::run() {
  switch (options_.mode()) {
  case Server::Mode::Serve:
    server_->run();
    return true;
  case Server::Mode::Validate: {
    auto local_address = Network::Utility::getLocalAddress(options_.localAddressIpVersion());
    return Server::validateConfig(options_, local_address, component_factory_);
  }
  case Server::Mode::InitOnly:
    PERF_DUMP();
    return true;
  }
  NOT_REACHED;
}

对应的,这里执行了server->run()方法,代码如下:

void InstanceImpl::run() {
  RunHelper helper(*dispatcher_, clusterManager(), restarter_, access_log_manager_, init_manager_,
                   [this]() -> void { startWorkers(); });

  // Run the main dispatch loop waiting to exit.
  ENVOY_LOG(info, "starting main dispatch loop");
  auto watchdog = guard_dog_->createWatchDog(Thread::Thread::currentThreadId());
  watchdog->startWatchdog(*dispatcher_);
  dispatcher_->run(Event::Dispatcher::RunType::Block);
  ENVOY_LOG(info, "main dispatch loop exited");
  guard_dog_->stopWatching(watchdog);
  watchdog.reset();

  terminate();
}

这部分代码就是run的核心启动逻辑了,下图简要梳理了Envoy从启动到建立连接的主要流程。主要流程是启动worker,加载Listener,然后接受新连接三个三块操作。


Envoy new connect
2. 启动worker
 RunHelper helper(*dispatcher_, clusterManager(), restarter_, access_log_manager_, init_manager_,
                   [this]() -> void { startWorkers(); });

在serve.cc中,server去调用RunHelper来启动startWorkers(),其代码如下:

void InstanceImpl::startWorkers() {
  listener_manager_->startWorkers(*guard_dog_);

  // At this point we are ready to take traffic and all listening ports are up. Notify our parent
  // if applicable that they can stop listening and drain.
  restarter_.drainParentListeners();
  drain_manager_->startParentShutdownSequence();
}

先看第一行listener_manager_->startWorkers(*guard_dog_)

void ListenerManagerImpl::startWorkers(GuardDog& guard_dog) {
  ENVOY_LOG(info, "all dependencies initialized. starting workers");
  ASSERT(!workers_started_);
  workers_started_ = true;
  for (const auto& worker : workers_) {
    ASSERT(warming_listeners_.empty());
    for (const auto& listener : active_listeners_) {
      addListenerToWorker(*worker, *listener);
    }
    worker->start(guard_dog);
  }
}

抛开断言和日志,核心是两个for循环,外面这层是遍历初始化中的worker_,还记得在https://www.jianshu.com/p/204d1631239d
中对worker_的初始化么,这里取出其中的每一个,内层for取出active状态的所有listener,添加listener到worker上,执行addListenerToWorker(*worker, *listener),最后启动每一个worker_
那么去看看addListenerWorker做了什么吧。

void ListenerManagerImpl::addListenerToWorker(Worker& worker, ListenerImpl& listener) {
  worker.addListener(listener, [this, &listener](bool success) -> void {
    // The add listener completion runs on the worker thread. Post back to the main thread to
    // avoid locking.
    server_.dispatcher().post([this, success, &listener]() -> void {
      // It is theoretically possible for a listener to get added on 1 worker but not the others.
      // The below check with onListenerCreateFailure() is there to ensure we execute the
      // removal/logging/stats at most once on failure. Note also that that drain/removal can race
      // with addition. It's guaranteed that workers process remove after add so this should be
      // fine.
      if (!success && !listener.onListenerCreateFailure()) {
        // TODO(mattklein123): In addition to a critical log and a stat, we should consider adding
        //                     a startup option here to cause the server to exit. I think we
        //                     probably want this at Lyft but I will do it in a follow up.
        ENVOY_LOG(critical, "listener '{}' failed to listen on address '{}' on worker",
                  listener.name(), listener.socket().localAddress()->asString());
        stats_.listener_create_failure_.inc();
        removeListener(listener.name());
      }
      if (success) {
        stats_.listener_create_success_.inc();
      }
    });
  });
}

worker的addListener方法两个入参,一个是listener,另一个是add成功后的回调函数,这个可以从其头文件中的注释中看出,

/**
   * Add a listener to the worker.
   * @param listener supplies the listener to add.
   * @param completion supplies the completion to call when the listener has been added (or not) on
   *                   the worker.
   */
  virtual void addListener(Network::ListenerConfig& listener,
                           AddListenerCompletion completion) PURE;

回过头看worker的addListener如下,调用dispatcher的post方法,传入callback func,callback的入参是worker addListener本身的入参listener、和complete这个外部传入的回调,来完成给worker添加listener,

void WorkerImpl::addListener(Network::ListenerConfig& listener, AddListenerCompletion completion) {
  // All listener additions happen via post. However, we must deal with the case where the listener
  // can not be created on the worker. There is a race condition where 2 processes can successfully
  // bind to an address, but then fail to listen() with EADDRINUSE. During initial startup, we want
  // to surface this.
  dispatcher_->post([this, &listener, completion]() -> void {
    try {
      handler_->addListener(listener);
      hooks_.onWorkerListenerAdded();
      completion(true);
    } catch (const Network::CreateListenerException& e) {
      completion(false);
    }
  });
}

上面dispatcher的callback的body体中handler->addListener(listener)是添加listener的核心代码

void ConnectionHandlerImpl::addListener(Network::ListenerConfig& config) {
  ActiveListenerPtr l(new ActiveListener(*this, config));
  listeners_.emplace_back(config.socket().localAddress(), std::move(l));
}

handler->addListener(listener)由ConnectionHandlerImpl实现如下,入参config是上述两层for循环中的内层activelistener配置,config作为ActiveListener的构造函数入参构造了一个ActiveListener,并用指针指向它。观察ActiveListener的构造函数如下

ConnectionHandlerImpl::ActiveListener::ActiveListener(ConnectionHandlerImpl& parent,
                                                      Network::ListenerConfig& config)
    : ActiveListener(
          parent,
          parent.dispatcher_.createListener(config.socket(), *this, config.bindToPort(),
                                            config.handOffRestoredDestinationConnections()),
          config) {}

那句createListener的实现在DispatcherIml这个类中(明确我们现在是在分析addListener的流程代码,add之前要把这个listener创建出来,所以目前在connection_handler_impl.cc中,而他的构造函数中,createListener是有dispatcher对象创建的),其实实现就一句话,return Network::ListenerPtr{new Network::ListenerImpl(*this, socket, cb, bind_to_port, hand_off_restored_destination_connections)};这里new出来了一个ListenerImpl,并用一个NetWork:ListenerPtr的指针指向它。这里创建出来的Listener返回 ConnectionHandlerImpl::addListener中,最后,listener_是一个list结构std::list<std::pair<Network::Address::InstanceConstSharedPtr, ActiveListenerPtr>> listeners_;它定义在connection_handler_impl.h中。

3. Listener的加载&接收连接

其实上面一部分已经涉及到了不少Listener加载的流程(addListener),接下来的部分主要是加载后的连接建立部分。我们来复习一下Envoy启动和接收连接那个图。


Envoy new connect

在1和2中,我们已经完成了从server->run到createListener的流程,现在,让我们深入到2的最后那句话,看看Listener这个对象是怎么实现的,毕竟是在这里面完成对上下游流量连接的监听和回调处理的。

其实实现就一句话,return Network::ListenerPtr{new Network::ListenerImpl(*this, socket, cb, bind_to_port, hand_off_restored_destination_connections)};这里new出来了一个ListenerImpl,并用一个NetWork:ListenerPtr的指针指向它。

ListenerImpl::ListenerImpl(Event::DispatcherImpl& dispatcher, Socket& socket, ListenerCallbacks& cb,
                           bool bind_to_port, bool hand_off_restored_destination_connections)
    : local_address_(nullptr), cb_(cb),
      hand_off_restored_destination_connections_(hand_off_restored_destination_connections),
      listener_(nullptr) {
  const auto ip = socket.localAddress()->ip();

  // Only use the listen socket's local address for new connections if it is not the all hosts
  // address (e.g., 0.0.0.0 for IPv4).
  if (!(ip && ip->isAnyAddress())) {
    local_address_ = socket.localAddress();
  }

  if (bind_to_port) {
    listener_.reset(
        evconnlistener_new(&dispatcher.base(), listenCallback, this, 0, -1, socket.fd()));

    if (!listener_) {
      throw CreateListenerException(
          fmt::format("cannot listen on socket: {}", socket.localAddress()->asString()));
    }

    if (!Network::Socket::applyOptions(socket.options(), socket, Socket::SocketState::Listening)) {
      throw CreateListenerException(fmt::format(
          "cannot set post-listen socket option on socket: {}", socket.localAddress()->asString()));
    }

    evconnlistener_set_error_cb(listener_.get(), errorCallback);
  }
}

在listener_.reset(
evconnlistener_new(&dispatcher.base(), listenCallback, this, 0, -1, socket.fd()));中evconnlistener_new来注册监听socket的fd的新连接,通过listencallback回调。listencallback函数如下,在这当中实现了listener->cb_.onAccept().

void ListenerImpl::listenCallback(evconnlistener*, evutil_socket_t fd, sockaddr* remote_addr,
                                  int remote_addr_len, void* arg) {
  ListenerImpl* listener = static_cast<ListenerImpl*>(arg);
  // Get the local address from the new socket if the listener is listening on IP ANY
  // (e.g., 0.0.0.0 for IPv4) (local_address_ is nullptr in this case).
  const Address::InstanceConstSharedPtr& local_address =
      listener->local_address_ ? listener->local_address_ : listener->getLocalAddress(fd);
  // The accept() call that filled in remote_addr doesn't fill in more than the sa_family field
  // for Unix domain sockets; apparently there isn't a mechanism in the kernel to get the
  // sockaddr_un associated with the client socket when starting from the server socket.
  // We work around this by using our own name for the socket in this case.
  // Pass the 'v6only' parameter as true if the local_address is an IPv6 address. This has no effect
  // if the socket is a v4 socket, but for v6 sockets this will create an IPv4 remote address if an
  // IPv4 local_address was created from an IPv6 mapped IPv4 address.
  const Address::InstanceConstSharedPtr& remote_address =
      (remote_addr->sa_family == AF_UNIX)
          ? Address::peerAddressFromFd(fd)
          : Address::addressFromSockAddr(*reinterpret_cast<const sockaddr_storage*>(remote_addr),
                                         remote_addr_len,
                                         local_address->ip()->version() == Address::IpVersion::v6);
  listener->cb_.onAccept(std::make_unique<AcceptedSocketImpl>(fd, local_address, remote_address),
                         listener->hand_off_restored_destination_connections_);
}

cb_的onAccept是ConnectionHandlerImpl::ActiveListener::onAccept,实现如下,

void ConnectionHandlerImpl::ActiveListener::onAccept(
    Network::ConnectionSocketPtr&& socket, bool hand_off_restored_destination_connections) {
  Network::Address::InstanceConstSharedPtr local_address = socket->localAddress();
  auto active_socket = std::make_unique<ActiveSocket>(*this, std::move(socket),
                                                      hand_off_restored_destination_connections);

  // Create and run the filters
  config_.filterChainFactory().createListenerFilterChain(*active_socket);
  active_socket->continueFilterChain(true);

  // Move active_socket to the sockets_ list if filter iteration needs to continue later.
  // Otherwise we let active_socket be destructed when it goes out of scope.
  if (active_socket->iter_ != active_socket->accept_filters_.end()) {
    active_socket->moveIntoListBack(std::move(active_socket), sockets_);
  }
}

上述代码中,config_.filterChainFactory().createListenerFilterChain(*active_socket);会创建listener过滤器的filterChain,然后通过continueFilterChain()运行过滤器。

到这里我们先大致梳理一下再继续去继续去看Envoy是如何接收连接的。
在上面,我们提到,

在listener_.reset(evconnlistener_new(&dispatcher.base(), listenCallback, this, 0, -1, socket.fd()));中evconnlistener_new来注册监听socket的fd的新连接,通过listencallback回调。

也就是说,当envoy以sidecar形式运行在pod中去代理client的流量时,通过监听对应端口,在client通过该端口发起请求时,触发envoy的对应回调,执行以下函数调用链listenCallback->cb_.onAccept->createListenerFilterChain+continueFilterChain,

  • createListenerFilterChain 创建FilterChain,这个可以用来拓展过滤器
  • continueFilterChain启动过滤器
接收连接主要分为两部分
  • continueFilterChain中listener_.newConnection(std::move(socket_));中createServerConnection
    createServerConnection建立了client请求段和Envoy server之间的连接,并在这里进行buffer的高低水位进行流量控制。这个createServerConnection返回的是一个指向connection对象的指针,其构造方法在connection_impl.cc中,在这当中实现了并创建FileEvent。对应其构造函数中实现如下:
  // We never ask for both early close and read at the same time. If we are reading, we want to
  // consume all available data.
  file_event_ = dispatcher_.createFileEvent(
      fd(), [this](uint32_t events) -> void { onFileEvent(events); }, Event::FileTriggerType::Edge,
      Event::FileReadyType::Read | Event::FileReadyType::Write);

在创建FileEvent后,会创建新的FileEventImpl(),然后通过assignEvents()分配事件,再通过event_add()注册事件。

  • continueFilterChain中listener_.newConnection(std::move(socket_));中createNetworkFilterChain
    在createServerConnection之后,通过createNetworkFilterChain创建网络的过滤链,代码如下。
  const bool empty_filter_chain = !config_.filterChainFactory().createNetworkFilterChain(
      *new_connection, filter_chain->networkFilterFactories());

通过FilterFactory的回调来执行函数buildFilterChain(),返回filter_manager的initializeReadFilters,初始化readFilter。如果这里network的filter是空的,即empty_filter_chain是空的,则关闭链接如下:

  if (empty_filter_chain) {
    ENVOY_CONN_LOG_TO_LOGGER(parent_.logger_, debug, "closing connection: no filters",
                             *new_connection);
    new_connection->close(Network::ConnectionCloseType::NoFlush);
    return;
  }
  • 在上述两步完成之后,启动一次技术。通过onNewConnection,进行一次ActiveConnection的监听计数:自增1.
4. 回顾

OK,至此,我们已经走完了本文刚开始,Envoy启动和对client发起请求时新连接建立(监听实现)的整个梳理如下图。
Envoy new connect

现在,来回顾之前最早main函数为出发点中,server->run()做了什么。还记得run的实现么?

void InstanceImpl::run() {
  RunHelper helper(*dispatcher_, clusterManager(), restarter_, access_log_manager_, init_manager_,
                   [this]() -> void { startWorkers(); });

  // Run the main dispatch loop waiting to exit.
  ENVOY_LOG(info, "starting main dispatch loop");
  auto watchdog = guard_dog_->createWatchDog(Thread::Thread::currentThreadId());
  watchdog->startWatchdog(*dispatcher_);
  dispatcher_->run(Event::Dispatcher::RunType::Block);
  ENVOY_LOG(info, "main dispatch loop exited");
  guard_dog_->stopWatching(watchdog);
  watchdog.reset();

  terminate();
}
  • helper启动worker,添加listener,绑定回调和filterchain
  • createWatchDog启动守护进程看门狗,防止死锁
  • dispatcher_->run(Event::Dispatcher::RunType::Block); 运行调度器,调度器运行之后会调用libevent的envet_base_loop进行监听,当有新事件来到时进进入处理流程。这部分我们下节分析。
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,233评论 6 495
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,357评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,831评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,313评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,417评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,470评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,482评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,265评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,708评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,997评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,176评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,827评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,503评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,150评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,391评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,034评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,063评论 2 352

推荐阅读更多精彩内容