Horovod为Uber开源的一个分布式训练框架,支持主流的机器学习框架(Tensorflow, PyTorch及MxNet)。本文主要是基于版本v0.21.1介绍Horovod的核心实现,以及与各个框架的集成。
Horovod的工作流程比较简单,有一个消息队列接收AllReduce,AllGather以及Broadcast这三个op的请求,有一个后台线程会每隔一段时间轮询消息队列,拿到一批op之后,会对op中的tensor进行融合,再进行相应的操作。如果tensor在显存中,那么它会使用NCCL库执行。而如果是在内存中,则会使用MPI或者Gloo执行。
Horovod的核心代码位于horovod/common目录中。operations.cc文件相当于Horovod的入口,它包含了BackgroundThreadLoop、RunLoopOnce等重要函数。顺着这几个函数看下去,可以略窥一二。
首先欣赏一下函数RunLoopOnce,这里省略了一些优化的代码,比如使用response cache,auto tune等:
boolRunLoopOnce(HorovodGlobalState& state){// 检查从上一个cycle开始到现在,是否已经超过一个cycle时间(CycleTimeMs)autostart_time =std::chrono::steady_clock::now();autosleep_duration = state.last_cycle_start +std::chrono::microseconds(long( state.parameter_manager.CycleTimeMs() *1000.)) - start_time;if(sleep_duration >std::chrono::steady_clock::duration::zero()) {std::this_thread::sleep_for(sleep_duration); } state.last_cycle_start =std::chrono::steady_clock::now();// 在Timeline中记录,用户拿到Timeline结果后,可以在chrome中查看if(state.mark_cycles_in_timeline) {// Mark start of the new cycle.state.timeline.MarkCycleStart(); }autoresponse_list = state.controller->ComputeResponseList(horovod_global.shut_down, state); state.mark_cycles_in_timeline = state.controller->MarkCyclesInTimelinePending();// 对于每个response,做collective的操作for(auto& response : response_list.responses()) { PerformOperation(response, horovod_global); }return!response_list.shutdown();}复制代码
从HorovodRunOnce函数中,我们可以看到Horovod的工作流程大致如之前所说的,是一个生产者和消费者的模式。controller在这里是做协调的工作:会互通各个rank有哪些request已经就绪,对于就绪的request,执行collective的操作。
接下来我们先看看ComputeResponseList这个函数。这个函数是个长达380行的超长函数,为了更方便地理解这个函数在干什么,这里先把cache以及检查stall的代码去除:
ResponseListController::ComputeResponseList(std::atomic_bool& shut_down, HorovodGlobalState& state){CacheCoordinatorcache_coordinator(response_cache_.num_active_bits());// message queue used only in this cyclestd::deque message_queue_tmp; tensor_queue_.PopMessagesFromQueue(message_queue_tmp);for(auto& message : message_queue_tmp) {if(message.request_type() == Request::JOIN) { state.joined =true; cache_coordinator.set_uncached_in_queue(true);continue; } }// Flag indicating that the background thread should shut down.boolshould_shut_down = shut_down; cache_coordinator.set_should_shut_down(should_shut_down); ResponseList response_list; response_list.set_shutdown(cache_coordinator.should_shut_down()); {// Collect all tensors that are ready to be reduced. Record them in the// tensor count table (rank zero) or send them to rank zero to be// recorded (everyone else).std::vector ready_to_reduce;if(is_coordinator_) {// 对于master进程,记录已经ready的tensor。注意此时message_queue_tmp中的request是来自// master进程while(!message_queue_tmp.empty()) {// Pop the first available messageRequest message = message_queue_tmp.front(); message_queue_tmp.pop_front();if(message.request_type() == Request::JOIN) { state.joined_size++;continue; }boolreduce = IncrementTensorCount(message, state.joined_size);if(reduce) { ready_to_reduce.push_back(message.tensor_name()); } }// 接收其他rank的ready的tensorstd::vector ready_list; RecvReadyTensors(ready_to_reduce, ready_list);// 处理来自其他rank的request。size_是指有多少个rankfor(inti =1; i < size_; ++i) {autoreceived_message_list = ready_list[i];for(auto& received_message : received_message_list.requests()) {auto& received_name = received_message.tensor_name();// Join类型消息是指有新的rank加入,Horovod支持弹性if(received_message.request_type() == Request::JOIN) { state.joined_size++;continue; }// 增加该tensor已经ready的rank的个数,如果所有rank都ready,则发给其他rankboolreduce = IncrementTensorCount(received_message, state.joined_size);if(reduce) { ready_to_reduce.push_back(received_name); } }if(received_message_list.shutdown()) {// Received SHUTDOWN request from one of the workers.should_shut_down =true; } }// Check if tensors from previous ticks are ready to reduce after Joins.if(state.joined_size >0) {for(auto& table_iter : message_table_) {intcount = (int)table_iter.second.size();if(count == (size_ - state.joined_size) &&std::find(ready_to_reduce.begin(), ready_to_reduce.end(), table_iter.first) == ready_to_reduce.end()) { state.timeline.NegotiateEnd(table_iter.first); ready_to_reduce.push_back(table_iter.first); } } }// 这个条件有点让人费解,看字面意思是如果禁止group fusion,并且group_table_非空,则fuse?if(state.disable_group_fusion && !group_table_.empty()) {// Extract set of common groups from coordinator tensor list and cache hits.std::vector common_ready_groups;std::unordered_set processed;for(constauto& tensor_name : ready_to_reduce) {intgroup_id = group_table_.GetGroupIDFromTensorName(tensor_name);if(group_id != NULL_GROUP_ID && processed.find(group_id) == processed.end()) { common_ready_groups.push_back(group_id); processed.insert(group_id);// Leaving name in list, to be skipped later.} }// For each ready group, form and fuse response lists independentlyfor(autoid : common_ready_groups) {std::deque responses;for(constauto&tensor_name : group_table_.GetGroupTensorNames(id)) {if(message_table_.find(tensor_name) != message_table_.end()) {// Uncached messageResponse response = ConstructResponse(tensor_name, state.joined_size); responses.push_back(std::move(response)); } } FuseResponses(responses, state, response_list); } }// At this point, rank zero should have a fully updated tensor count// table and should know all the tensors that need to be reduced or// gathered, and everyone else should have sent all their information// to rank zero. We can now do reductions and gathers; rank zero will// choose which ones and in what order, and will notify the other ranks// before doing each reduction.std::deque responses;for(auto& tensor_name : ready_to_reduce) {// Skip tensors in group that were handled earlier.if(state.disable_group_fusion && !group_table_.empty() && group_table_.GetGroupIDFromTensorName(tensor_name) != NULL_GROUP_ID) {continue; } Response response = ConstructResponse(tensor_name, state.joined_size); responses.push_back(std::move(response)); }if(state.joined_size == size_) {// All ranks did Join(). Send the response, reset joined size.Response join_response; join_response.set_response_type(Response::JOIN); join_response.add_tensor_name(JOIN_TENSOR_NAME); responses.push_back(std::move(join_response)); state.joined_size =0; } FuseResponses(responses, state, response_list); response_list.set_shutdown(should_shut_down);// Broadcast final results to other ranks.SendFinalTensors(response_list); }else{// 非master,则发送自己已经ready的tensor给master,再接收已经ready的tensor列表RequestList message_list; message_list.set_shutdown(should_shut_down);while(!message_queue_tmp.empty()) { message_list.add_request(message_queue_tmp.front()); message_queue_tmp.pop_front(); }// Send ready tensors to rank zeroSendReadyTensors(message_list);// Receive final tensors to be processed from rank zeroRecvFinalTensors(response_list); } }if(!response_list.responses().empty()) {std::stringtensors_ready;for(constauto& r : response_list.responses()) { tensors_ready += r.tensor_names_string() +"; "; } }// Reassign cache bits based on current cache order.response_cache_.update_cache_bits();returnresponse_list;}复制代码
在Horovod中,每张卡都对应一个训练进程,称之为rank。如4张卡,对应的各个进程的rank则为[0,1,2,3]。rank为0的进程作为master,其余的进程为worker。worker会在ComputeResponseList中向master发送已经ready的tensor。如果一个tensor在所有的rank中都已经ready,则master会通知其他rank,可以对这个tensor执行collective操作。
接下来继续看在HorovodRunOnce中出现的另一重要函数PerformOperation。这个函数比较清楚,主要是做三件事情:
对tensor做fusion:即将一些tensor合并成一个大的tensor,再做collective的操作
等待数据到位
做collective操作
voidPerformOperation(Response response, HorovodGlobalState& state){std::vector entries;auto& timeline = horovod_global.timeline;if(response.response_type() != Response::JOIN) {// 这里有点奇怪,直接用了horovod_global这个变量,而拿joined的时候,又是从state里拿的horovod_global.tensor_queue.GetTensorEntriesFromResponse(response, entries, state.joined);for(auto& e : entries) { timeline.Start(e.tensor_name, response.response_type()); }if(entries.size() >1) {// 如果多于1个,则可以进行fuse,以提高throughputautofirst_entry = entries[0];// Note: it is OK for different entries to come from different frameworks// since buffer allocated here is guaranteed to survive at least till the// end of this operation.Status status = horovod_global.fusion_buffer.InitializeBuffer( horovod_global.controller->TensorFusionThresholdBytes(), first_entry.device, first_entry.context, horovod_global.current_nccl_stream, [&]() { timeline.ActivityStartAll(entries, INIT_FUSION_BUFFER); }, [&]() { timeline.ActivityEndAll(entries); });if(!status.ok()) { LOG(DEBUG, horovod_global.controller->GetRank()) <<"InitializeBuffer Failed";for(auto& e : entries) { timeline.End(e.tensor_name,nullptr);// Callback can be null if the rank sent Join request.if(e.callback !=nullptr) { e.callback(status); } }return; } }// On GPU data readiness is signalled by ready_event.// 即使tensor可以进行操作了,但需要等待数据同步到显存std::vector waiting_tensors;for(auto& e : entries) {if(e.ready_event !=nullptr) { timeline.ActivityStart(e.tensor_name, WAIT_FOR_DATA); waiting_tensors.push_back(e); } }while(!waiting_tensors.empty()) {for(autoit = waiting_tensors.begin(); it != waiting_tensors.end();) {if(it->ready_event->Ready()) { timeline.ActivityEnd(it->tensor_name); timeline.ActivityStart(it->tensor_name, WAIT_FOR_OTHER_TENSOR_DATA); it = waiting_tensors.erase(it); }else{ ++it; } }std::this_thread::sleep_for(std::chrono::nanoseconds(100)); }for(auto& e : entries) {if(e.ready_event !=nullptr) { timeline.ActivityEnd(e.tensor_name); } } }// 终于可以进行collective的操作了Status status;try{ status = op_manager->ExecuteOperation(entries, response); }catch(conststd::exception& ex) { LOG(DEBUG, horovod_global.controller->GetRank()) <<"ExecuteOperation Failed"; status = Status::UnknownError(ex.what()); }if(!status.in_progress()) {for(auto& e : entries) { timeline.End(e.tensor_name, status.ok() ? e.output :nullptr);// Callback can be null if the rank sent Join request.if(e.callback !=nullptr) { e.callback(status); } } }}复制代码
至此,Horovod的主要工作流程就介绍完毕。