代码版本:stable/v1.7.1
上篇文章我们分析过Envoy启动的第一步,进行六个模块的初始化操作,其中比较重要的一块是server的初始化。本片文章将继续走读server对象实例被初始化之后的启动流程以及新连接的建立。
1. 入口
入口函数还是在main.cc中的int main中,如下
/**
* Basic Site-Specific main()
*
* This should be used to do setup tasks specific to a particular site's
* deployment such as initializing signal handling. It calls main_common
* after setting up command line options.
*/
int main(int argc, char** argv) {
std::unique_ptr<Envoy::MainCommon> main_common;
// Initialize the server's main context under a try/catch loop and simply return EXIT_FAILURE
// as needed. Whatever code in the initialization path that fails is expected to log an error
// message so the user can diagnose.
try {
main_common = std::make_unique<Envoy::MainCommon>(argc, argv);
} catch (const Envoy::NoServingException& e) {
return EXIT_SUCCESS;
} catch (const Envoy::MalformedArgvException& e) {
return EXIT_FAILURE;
} catch (const Envoy::EnvoyException& e) {
return EXIT_FAILURE;
}
// Run the server listener loop outside try/catch blocks, so that unexpected exceptions
// show up as a core-dumps for easier diagnostis.
return main_common->run() ? EXIT_SUCCESS : EXIT_FAILURE;
}
其中main_common是Envoy::MainCommon类型实例,在其class定义中,run函数定义如下
lass MainCommon {
public:
MainCommon(int argc, const char* const* argv);
bool run() { return base_.run(); }
static std::string hotRestartVersion(uint64_t max_num_stats, uint64_t max_stat_name_len,
bool hot_restart_enabled);
private:
#ifdef ENVOY_HANDLE_SIGNALS
Envoy::SignalAction handle_sigs;
Envoy::TerminateHandler log_on_terminate;
#endif
Envoy::OptionsImpl options_;
MainCommonBase base_;
};
base_类型是是MainCommonBase类型的实例,在main_common.cc中找到run的具体实现
bool MainCommonBase::run() {
switch (options_.mode()) {
case Server::Mode::Serve:
server_->run();
return true;
case Server::Mode::Validate: {
auto local_address = Network::Utility::getLocalAddress(options_.localAddressIpVersion());
return Server::validateConfig(options_, local_address, component_factory_);
}
case Server::Mode::InitOnly:
PERF_DUMP();
return true;
}
NOT_REACHED;
}
对应的,这里执行了server->run()方法,代码如下:
void InstanceImpl::run() {
RunHelper helper(*dispatcher_, clusterManager(), restarter_, access_log_manager_, init_manager_,
[this]() -> void { startWorkers(); });
// Run the main dispatch loop waiting to exit.
ENVOY_LOG(info, "starting main dispatch loop");
auto watchdog = guard_dog_->createWatchDog(Thread::Thread::currentThreadId());
watchdog->startWatchdog(*dispatcher_);
dispatcher_->run(Event::Dispatcher::RunType::Block);
ENVOY_LOG(info, "main dispatch loop exited");
guard_dog_->stopWatching(watchdog);
watchdog.reset();
terminate();
}
这部分代码就是run的核心启动逻辑了,下图简要梳理了Envoy从启动到建立连接的主要流程。主要流程是启动worker,加载Listener,然后接受新连接三个三块操作。
2. 启动worker
RunHelper helper(*dispatcher_, clusterManager(), restarter_, access_log_manager_, init_manager_,
[this]() -> void { startWorkers(); });
在serve.cc中,server去调用RunHelper来启动startWorkers(),其代码如下:
void InstanceImpl::startWorkers() {
listener_manager_->startWorkers(*guard_dog_);
// At this point we are ready to take traffic and all listening ports are up. Notify our parent
// if applicable that they can stop listening and drain.
restarter_.drainParentListeners();
drain_manager_->startParentShutdownSequence();
}
先看第一行listener_manager_->startWorkers(*guard_dog_)
void ListenerManagerImpl::startWorkers(GuardDog& guard_dog) {
ENVOY_LOG(info, "all dependencies initialized. starting workers");
ASSERT(!workers_started_);
workers_started_ = true;
for (const auto& worker : workers_) {
ASSERT(warming_listeners_.empty());
for (const auto& listener : active_listeners_) {
addListenerToWorker(*worker, *listener);
}
worker->start(guard_dog);
}
}
抛开断言和日志,核心是两个for循环,外面这层是遍历初始化中的worker_,还记得在https://www.jianshu.com/p/204d1631239d
中对worker_的初始化么,这里取出其中的每一个,内层for取出active状态的所有listener,添加listener到worker上,执行addListenerToWorker(*worker, *listener),最后启动每一个worker_
那么去看看addListenerWorker做了什么吧。
void ListenerManagerImpl::addListenerToWorker(Worker& worker, ListenerImpl& listener) {
worker.addListener(listener, [this, &listener](bool success) -> void {
// The add listener completion runs on the worker thread. Post back to the main thread to
// avoid locking.
server_.dispatcher().post([this, success, &listener]() -> void {
// It is theoretically possible for a listener to get added on 1 worker but not the others.
// The below check with onListenerCreateFailure() is there to ensure we execute the
// removal/logging/stats at most once on failure. Note also that that drain/removal can race
// with addition. It's guaranteed that workers process remove after add so this should be
// fine.
if (!success && !listener.onListenerCreateFailure()) {
// TODO(mattklein123): In addition to a critical log and a stat, we should consider adding
// a startup option here to cause the server to exit. I think we
// probably want this at Lyft but I will do it in a follow up.
ENVOY_LOG(critical, "listener '{}' failed to listen on address '{}' on worker",
listener.name(), listener.socket().localAddress()->asString());
stats_.listener_create_failure_.inc();
removeListener(listener.name());
}
if (success) {
stats_.listener_create_success_.inc();
}
});
});
}
worker的addListener方法两个入参,一个是listener,另一个是add成功后的回调函数,这个可以从其头文件中的注释中看出,
/**
* Add a listener to the worker.
* @param listener supplies the listener to add.
* @param completion supplies the completion to call when the listener has been added (or not) on
* the worker.
*/
virtual void addListener(Network::ListenerConfig& listener,
AddListenerCompletion completion) PURE;
回过头看worker的addListener如下,调用dispatcher的post方法,传入callback func,callback的入参是worker addListener本身的入参listener、和complete这个外部传入的回调,来完成给worker添加listener,
void WorkerImpl::addListener(Network::ListenerConfig& listener, AddListenerCompletion completion) {
// All listener additions happen via post. However, we must deal with the case where the listener
// can not be created on the worker. There is a race condition where 2 processes can successfully
// bind to an address, but then fail to listen() with EADDRINUSE. During initial startup, we want
// to surface this.
dispatcher_->post([this, &listener, completion]() -> void {
try {
handler_->addListener(listener);
hooks_.onWorkerListenerAdded();
completion(true);
} catch (const Network::CreateListenerException& e) {
completion(false);
}
});
}
上面dispatcher的callback的body体中handler->addListener(listener)是添加listener的核心代码
void ConnectionHandlerImpl::addListener(Network::ListenerConfig& config) {
ActiveListenerPtr l(new ActiveListener(*this, config));
listeners_.emplace_back(config.socket().localAddress(), std::move(l));
}
handler->addListener(listener)由ConnectionHandlerImpl实现如下,入参config是上述两层for循环中的内层activelistener配置,config作为ActiveListener的构造函数入参构造了一个ActiveListener,并用指针指向它。观察ActiveListener的构造函数如下
ConnectionHandlerImpl::ActiveListener::ActiveListener(ConnectionHandlerImpl& parent,
Network::ListenerConfig& config)
: ActiveListener(
parent,
parent.dispatcher_.createListener(config.socket(), *this, config.bindToPort(),
config.handOffRestoredDestinationConnections()),
config) {}
那句createListener的实现在DispatcherIml这个类中(明确我们现在是在分析addListener的流程代码,add之前要把这个listener创建出来,所以目前在connection_handler_impl.cc中,而他的构造函数中,createListener是有dispatcher对象创建的),其实实现就一句话,return Network::ListenerPtr{new Network::ListenerImpl(*this, socket, cb, bind_to_port, hand_off_restored_destination_connections)};这里new出来了一个ListenerImpl,并用一个NetWork:ListenerPtr的指针指向它。这里创建出来的Listener返回 ConnectionHandlerImpl::addListener中,最后,listener_是一个list结构std::list<std::pair<Network::Address::InstanceConstSharedPtr, ActiveListenerPtr>> listeners_;它定义在connection_handler_impl.h中。
3. Listener的加载&接收连接
其实上面一部分已经涉及到了不少Listener加载的流程(addListener),接下来的部分主要是加载后的连接建立部分。我们来复习一下Envoy启动和接收连接那个图。
在1和2中,我们已经完成了从server->run到createListener的流程,现在,让我们深入到2的最后那句话,看看Listener这个对象是怎么实现的,毕竟是在这里面完成对上下游流量连接的监听和回调处理的。
其实实现就一句话,return Network::ListenerPtr{new Network::ListenerImpl(*this, socket, cb, bind_to_port, hand_off_restored_destination_connections)};这里new出来了一个ListenerImpl,并用一个NetWork:ListenerPtr的指针指向它。
ListenerImpl::ListenerImpl(Event::DispatcherImpl& dispatcher, Socket& socket, ListenerCallbacks& cb,
bool bind_to_port, bool hand_off_restored_destination_connections)
: local_address_(nullptr), cb_(cb),
hand_off_restored_destination_connections_(hand_off_restored_destination_connections),
listener_(nullptr) {
const auto ip = socket.localAddress()->ip();
// Only use the listen socket's local address for new connections if it is not the all hosts
// address (e.g., 0.0.0.0 for IPv4).
if (!(ip && ip->isAnyAddress())) {
local_address_ = socket.localAddress();
}
if (bind_to_port) {
listener_.reset(
evconnlistener_new(&dispatcher.base(), listenCallback, this, 0, -1, socket.fd()));
if (!listener_) {
throw CreateListenerException(
fmt::format("cannot listen on socket: {}", socket.localAddress()->asString()));
}
if (!Network::Socket::applyOptions(socket.options(), socket, Socket::SocketState::Listening)) {
throw CreateListenerException(fmt::format(
"cannot set post-listen socket option on socket: {}", socket.localAddress()->asString()));
}
evconnlistener_set_error_cb(listener_.get(), errorCallback);
}
}
在listener_.reset(
evconnlistener_new(&dispatcher.base(), listenCallback, this, 0, -1, socket.fd()));中evconnlistener_new来注册监听socket的fd的新连接,通过listencallback回调。listencallback函数如下,在这当中实现了listener->cb_.onAccept().
void ListenerImpl::listenCallback(evconnlistener*, evutil_socket_t fd, sockaddr* remote_addr,
int remote_addr_len, void* arg) {
ListenerImpl* listener = static_cast<ListenerImpl*>(arg);
// Get the local address from the new socket if the listener is listening on IP ANY
// (e.g., 0.0.0.0 for IPv4) (local_address_ is nullptr in this case).
const Address::InstanceConstSharedPtr& local_address =
listener->local_address_ ? listener->local_address_ : listener->getLocalAddress(fd);
// The accept() call that filled in remote_addr doesn't fill in more than the sa_family field
// for Unix domain sockets; apparently there isn't a mechanism in the kernel to get the
// sockaddr_un associated with the client socket when starting from the server socket.
// We work around this by using our own name for the socket in this case.
// Pass the 'v6only' parameter as true if the local_address is an IPv6 address. This has no effect
// if the socket is a v4 socket, but for v6 sockets this will create an IPv4 remote address if an
// IPv4 local_address was created from an IPv6 mapped IPv4 address.
const Address::InstanceConstSharedPtr& remote_address =
(remote_addr->sa_family == AF_UNIX)
? Address::peerAddressFromFd(fd)
: Address::addressFromSockAddr(*reinterpret_cast<const sockaddr_storage*>(remote_addr),
remote_addr_len,
local_address->ip()->version() == Address::IpVersion::v6);
listener->cb_.onAccept(std::make_unique<AcceptedSocketImpl>(fd, local_address, remote_address),
listener->hand_off_restored_destination_connections_);
}
cb_的onAccept是ConnectionHandlerImpl::ActiveListener::onAccept,实现如下,
void ConnectionHandlerImpl::ActiveListener::onAccept(
Network::ConnectionSocketPtr&& socket, bool hand_off_restored_destination_connections) {
Network::Address::InstanceConstSharedPtr local_address = socket->localAddress();
auto active_socket = std::make_unique<ActiveSocket>(*this, std::move(socket),
hand_off_restored_destination_connections);
// Create and run the filters
config_.filterChainFactory().createListenerFilterChain(*active_socket);
active_socket->continueFilterChain(true);
// Move active_socket to the sockets_ list if filter iteration needs to continue later.
// Otherwise we let active_socket be destructed when it goes out of scope.
if (active_socket->iter_ != active_socket->accept_filters_.end()) {
active_socket->moveIntoListBack(std::move(active_socket), sockets_);
}
}
上述代码中,config_.filterChainFactory().createListenerFilterChain(*active_socket);会创建listener过滤器的filterChain,然后通过continueFilterChain()运行过滤器。
到这里我们先大致梳理一下再继续去继续去看Envoy是如何接收连接的。
在上面,我们提到,
在listener_.reset(evconnlistener_new(&dispatcher.base(), listenCallback, this, 0, -1, socket.fd()));中evconnlistener_new来注册监听socket的fd的新连接,通过listencallback回调。
也就是说,当envoy以sidecar形式运行在pod中去代理client的流量时,通过监听对应端口,在client通过该端口发起请求时,触发envoy的对应回调,执行以下函数调用链listenCallback->cb_.onAccept->createListenerFilterChain+continueFilterChain,
- createListenerFilterChain 创建FilterChain,这个可以用来拓展过滤器
- continueFilterChain启动过滤器
接收连接主要分为两部分
- continueFilterChain中listener_.newConnection(std::move(socket_));中createServerConnection
createServerConnection建立了client请求段和Envoy server之间的连接,并在这里进行buffer的高低水位进行流量控制。这个createServerConnection返回的是一个指向connection对象的指针,其构造方法在connection_impl.cc中,在这当中实现了并创建FileEvent。对应其构造函数中实现如下:
// We never ask for both early close and read at the same time. If we are reading, we want to
// consume all available data.
file_event_ = dispatcher_.createFileEvent(
fd(), [this](uint32_t events) -> void { onFileEvent(events); }, Event::FileTriggerType::Edge,
Event::FileReadyType::Read | Event::FileReadyType::Write);
在创建FileEvent后,会创建新的FileEventImpl(),然后通过assignEvents()分配事件,再通过event_add()注册事件。
- continueFilterChain中listener_.newConnection(std::move(socket_));中createNetworkFilterChain
在createServerConnection之后,通过createNetworkFilterChain创建网络的过滤链,代码如下。
const bool empty_filter_chain = !config_.filterChainFactory().createNetworkFilterChain(
*new_connection, filter_chain->networkFilterFactories());
通过FilterFactory的回调来执行函数buildFilterChain(),返回filter_manager的initializeReadFilters,初始化readFilter。如果这里network的filter是空的,即empty_filter_chain是空的,则关闭链接如下:
if (empty_filter_chain) {
ENVOY_CONN_LOG_TO_LOGGER(parent_.logger_, debug, "closing connection: no filters",
*new_connection);
new_connection->close(Network::ConnectionCloseType::NoFlush);
return;
}
- 在上述两步完成之后,启动一次技术。通过onNewConnection,进行一次ActiveConnection的监听计数:自增1.
4. 回顾
OK,至此,我们已经走完了本文刚开始,Envoy启动和对client发起请求时新连接建立(监听实现)的整个梳理如下图。现在,来回顾之前最早main函数为出发点中,server->run()做了什么。还记得run的实现么?
void InstanceImpl::run() {
RunHelper helper(*dispatcher_, clusterManager(), restarter_, access_log_manager_, init_manager_,
[this]() -> void { startWorkers(); });
// Run the main dispatch loop waiting to exit.
ENVOY_LOG(info, "starting main dispatch loop");
auto watchdog = guard_dog_->createWatchDog(Thread::Thread::currentThreadId());
watchdog->startWatchdog(*dispatcher_);
dispatcher_->run(Event::Dispatcher::RunType::Block);
ENVOY_LOG(info, "main dispatch loop exited");
guard_dog_->stopWatching(watchdog);
watchdog.reset();
terminate();
}
- helper启动worker,添加listener,绑定回调和filterchain
- createWatchDog启动守护进程看门狗,防止死锁
- dispatcher_->run(Event::Dispatcher::RunType::Block); 运行调度器,调度器运行之后会调用libevent的envet_base_loop进行监听,当有新事件来到时进进入处理流程。这部分我们下节分析。