Horovod源码分析(一)

Horovod为Uber开源的一个分布式训练框架,支持主流的机器学习框架(Tensorflow, PyTorch及MxNet)。本文主要是基于版本v0.21.1介绍Horovod的核心实现,以及与各个框架的集成。

Horovod的工作流程比较简单,有一个消息队列接收AllReduce,AllGather以及Broadcast这三个op的请求,有一个后台线程会每隔一段时间轮询消息队列,拿到一批op之后,会对op中的tensor进行融合,再进行相应的操作。如果tensor在显存中,那么它会使用NCCL库执行。而如果是在内存中,则会使用MPI或者Gloo执行。

Horovod的核心代码位于horovod/common目录中。operations.cc文件相当于Horovod的入口,它包含了BackgroundThreadLoop、RunLoopOnce等重要函数。顺着这几个函数看下去,可以略窥一二。

首先欣赏一下函数RunLoopOnce,这里省略了一些优化的代码,比如使用response cache,auto tune等:

boolRunLoopOnce(HorovodGlobalState& state){// 检查从上一个cycle开始到现在,是否已经超过一个cycle时间(CycleTimeMs)autostart_time =std::chrono::steady_clock::now();autosleep_duration = state.last_cycle_start +std::chrono::microseconds(long(                            state.parameter_manager.CycleTimeMs() *1000.)) -                        start_time;if(sleep_duration >std::chrono::steady_clock::duration::zero()) {std::this_thread::sleep_for(sleep_duration);  }  state.last_cycle_start =std::chrono::steady_clock::now();// 在Timeline中记录,用户拿到Timeline结果后,可以在chrome中查看if(state.mark_cycles_in_timeline) {// Mark start of the new cycle.state.timeline.MarkCycleStart();  }autoresponse_list =      state.controller->ComputeResponseList(horovod_global.shut_down, state);  state.mark_cycles_in_timeline =      state.controller->MarkCyclesInTimelinePending();// 对于每个response,做collective的操作for(auto& response : response_list.responses()) {    PerformOperation(response, horovod_global);  }return!response_list.shutdown();}复制代码

从HorovodRunOnce函数中,我们可以看到Horovod的工作流程大致如之前所说的,是一个生产者和消费者的模式。controller在这里是做协调的工作:会互通各个rank有哪些request已经就绪,对于就绪的request,执行collective的操作。

接下来我们先看看ComputeResponseList这个函数。这个函数是个长达380行的超长函数,为了更方便地理解这个函数在干什么,这里先把cache以及检查stall的代码去除:

ResponseListController::ComputeResponseList(std::atomic_bool& shut_down,                                            HorovodGlobalState& state){CacheCoordinatorcache_coordinator(response_cache_.num_active_bits());// message queue used only in this cyclestd::deque message_queue_tmp;  tensor_queue_.PopMessagesFromQueue(message_queue_tmp);for(auto& message : message_queue_tmp) {if(message.request_type() == Request::JOIN) {      state.joined =true;      cache_coordinator.set_uncached_in_queue(true);continue;    }  }// Flag indicating that the background thread should shut down.boolshould_shut_down = shut_down;  cache_coordinator.set_should_shut_down(should_shut_down);  ResponseList response_list;  response_list.set_shutdown(cache_coordinator.should_shut_down());  {// Collect all tensors that are ready to be reduced. Record them in the// tensor count table (rank zero) or send them to rank zero to be// recorded (everyone else).std::vector ready_to_reduce;if(is_coordinator_) {// 对于master进程,记录已经ready的tensor。注意此时message_queue_tmp中的request是来自// master进程while(!message_queue_tmp.empty()) {// Pop the first available messageRequest message = message_queue_tmp.front();        message_queue_tmp.pop_front();if(message.request_type() == Request::JOIN) {          state.joined_size++;continue;        }boolreduce = IncrementTensorCount(message, state.joined_size);if(reduce) {          ready_to_reduce.push_back(message.tensor_name());        }      }// 接收其他rank的ready的tensorstd::vector ready_list;      RecvReadyTensors(ready_to_reduce, ready_list);// 处理来自其他rank的request。size_是指有多少个rankfor(inti =1; i < size_; ++i) {autoreceived_message_list = ready_list[i];for(auto& received_message : received_message_list.requests()) {auto& received_name = received_message.tensor_name();// Join类型消息是指有新的rank加入,Horovod支持弹性if(received_message.request_type() == Request::JOIN) {            state.joined_size++;continue;          }// 增加该tensor已经ready的rank的个数,如果所有rank都ready,则发给其他rankboolreduce = IncrementTensorCount(received_message, state.joined_size);if(reduce) {            ready_to_reduce.push_back(received_name);          }        }if(received_message_list.shutdown()) {// Received SHUTDOWN request from one of the workers.should_shut_down =true;        }      }// Check if tensors from previous ticks are ready to reduce after Joins.if(state.joined_size >0) {for(auto& table_iter : message_table_) {intcount = (int)table_iter.second.size();if(count == (size_ - state.joined_size) &&std::find(ready_to_reduce.begin(), ready_to_reduce.end(),                        table_iter.first) == ready_to_reduce.end()) {            state.timeline.NegotiateEnd(table_iter.first);            ready_to_reduce.push_back(table_iter.first);          }        }      }// 这个条件有点让人费解,看字面意思是如果禁止group fusion,并且group_table_非空,则fuse?if(state.disable_group_fusion && !group_table_.empty()) {// Extract set of common groups from coordinator tensor list and cache hits.std::vector common_ready_groups;std::unordered_set processed;for(constauto& tensor_name : ready_to_reduce) {intgroup_id = group_table_.GetGroupIDFromTensorName(tensor_name);if(group_id != NULL_GROUP_ID && processed.find(group_id) == processed.end()) {            common_ready_groups.push_back(group_id);            processed.insert(group_id);// Leaving name in list, to be skipped later.}        }// For each ready group, form and fuse response lists independentlyfor(autoid : common_ready_groups) {std::deque responses;for(constauto&tensor_name : group_table_.GetGroupTensorNames(id)) {if(message_table_.find(tensor_name) != message_table_.end()) {// Uncached messageResponse response = ConstructResponse(tensor_name, state.joined_size);              responses.push_back(std::move(response));            }          }          FuseResponses(responses, state, response_list);        }      }// At this point, rank zero should have a fully updated tensor count// table and should know all the tensors that need to be reduced or// gathered, and everyone else should have sent all their information// to rank zero. We can now do reductions and gathers; rank zero will// choose which ones and in what order, and will notify the other ranks// before doing each reduction.std::deque responses;for(auto& tensor_name : ready_to_reduce) {// Skip tensors in group that were handled earlier.if(state.disable_group_fusion &&            !group_table_.empty() &&            group_table_.GetGroupIDFromTensorName(tensor_name) != NULL_GROUP_ID) {continue;        }        Response response = ConstructResponse(tensor_name, state.joined_size);        responses.push_back(std::move(response));      }if(state.joined_size == size_) {// All ranks did Join(). Send the response, reset joined size.Response join_response;        join_response.set_response_type(Response::JOIN);        join_response.add_tensor_name(JOIN_TENSOR_NAME);        responses.push_back(std::move(join_response));        state.joined_size =0;      }      FuseResponses(responses, state, response_list);      response_list.set_shutdown(should_shut_down);// Broadcast final results to other ranks.SendFinalTensors(response_list);    }else{// 非master,则发送自己已经ready的tensor给master,再接收已经ready的tensor列表RequestList message_list;      message_list.set_shutdown(should_shut_down);while(!message_queue_tmp.empty()) {        message_list.add_request(message_queue_tmp.front());        message_queue_tmp.pop_front();      }// Send ready tensors to rank zeroSendReadyTensors(message_list);// Receive final tensors to be processed from rank zeroRecvFinalTensors(response_list);    }  }if(!response_list.responses().empty()) {std::stringtensors_ready;for(constauto& r : response_list.responses()) {      tensors_ready += r.tensor_names_string() +"; ";    }  }// Reassign cache bits based on current cache order.response_cache_.update_cache_bits();returnresponse_list;}复制代码

在Horovod中,每张卡都对应一个训练进程,称之为rank。如4张卡,对应的各个进程的rank则为[0,1,2,3]。rank为0的进程作为master,其余的进程为worker。worker会在ComputeResponseList中向master发送已经ready的tensor。如果一个tensor在所有的rank中都已经ready,则master会通知其他rank,可以对这个tensor执行collective操作。

接下来继续看在HorovodRunOnce中出现的另一重要函数PerformOperation。这个函数比较清楚,主要是做三件事情:

对tensor做fusion:即将一些tensor合并成一个大的tensor,再做collective的操作

等待数据到位

做collective操作

voidPerformOperation(Response response, HorovodGlobalState& state){std::vector entries;auto& timeline = horovod_global.timeline;if(response.response_type() != Response::JOIN) {// 这里有点奇怪,直接用了horovod_global这个变量,而拿joined的时候,又是从state里拿的horovod_global.tensor_queue.GetTensorEntriesFromResponse(response, entries,                                                            state.joined);for(auto& e : entries) {      timeline.Start(e.tensor_name, response.response_type());    }if(entries.size() >1) {// 如果多于1个,则可以进行fuse,以提高throughputautofirst_entry = entries[0];// Note: it is OK for different entries to come from different frameworks// since buffer allocated here is guaranteed to survive at least till the// end of this operation.Status status = horovod_global.fusion_buffer.InitializeBuffer(          horovod_global.controller->TensorFusionThresholdBytes(),          first_entry.device, first_entry.context,          horovod_global.current_nccl_stream,          [&]() { timeline.ActivityStartAll(entries, INIT_FUSION_BUFFER); },          [&]() { timeline.ActivityEndAll(entries); });if(!status.ok()) {        LOG(DEBUG, horovod_global.controller->GetRank()) <<"InitializeBuffer Failed";for(auto& e : entries) {          timeline.End(e.tensor_name,nullptr);// Callback can be null if the rank sent Join request.if(e.callback !=nullptr) {            e.callback(status);          }        }return;      }    }// On GPU data readiness is signalled by ready_event.// 即使tensor可以进行操作了,但需要等待数据同步到显存std::vector waiting_tensors;for(auto& e : entries) {if(e.ready_event !=nullptr) {        timeline.ActivityStart(e.tensor_name, WAIT_FOR_DATA);        waiting_tensors.push_back(e);      }    }while(!waiting_tensors.empty()) {for(autoit = waiting_tensors.begin(); it != waiting_tensors.end();) {if(it->ready_event->Ready()) {          timeline.ActivityEnd(it->tensor_name);          timeline.ActivityStart(it->tensor_name, WAIT_FOR_OTHER_TENSOR_DATA);          it = waiting_tensors.erase(it);        }else{          ++it;        }      }std::this_thread::sleep_for(std::chrono::nanoseconds(100));    }for(auto& e : entries) {if(e.ready_event !=nullptr) {        timeline.ActivityEnd(e.tensor_name);      }    }  }// 终于可以进行collective的操作了Status status;try{    status = op_manager->ExecuteOperation(entries, response);  }catch(conststd::exception& ex) {    LOG(DEBUG, horovod_global.controller->GetRank()) <<"ExecuteOperation Failed";    status = Status::UnknownError(ex.what());  }if(!status.in_progress()) {for(auto& e : entries) {      timeline.End(e.tensor_name, status.ok() ? e.output :nullptr);// Callback can be null if the rank sent Join request.if(e.callback !=nullptr) {        e.callback(status);      }    }  }}复制代码

至此,Horovod的主要工作流程就介绍完毕。

©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容