MXNet: Barrier

1. KVStore里的Barrier

在mxnet的分布式训练里,主要模式就是参数服务器。每个worker或者agent就是一台machine,server用于参数的更新。那么,当我们期望在不同的worker之间进行同步的时候,就会需要到barrier这个方法。
当代码运行在worker的时候,我们可以通过调用kv._barrier()来进行同步。它的作用就是,会阻塞代码运行,直到每个worker都运行了kv._barrier()。然后接着运行。这样就实现了同步。
那么它是怎么做到的呢?

通过源码,我们不难发现,python端的接口调用了c++端的方法:

void Barrier() override {
    ps::Postoffice::Get()->Barrier(ps_worker_->get_customer()->customer_id(), ps::kWorkerGroup);
}

这个全局的PostofficeBarrier方法的部分源码如下:

void Postoffice::Barrier(int customer_id, int node_group) {
  // 省略部分代码
  // 省略部分代码


  std::unique_lock<std::mutex> ulk(barrier_mu_);
  barrier_done_[0][customer_id] = false;
  Message req;
  req.meta.recver = kScheduler;
  req.meta.request = true;
  req.meta.control.cmd = Control::BARRIER;
  req.meta.app_id = 0;
  req.meta.customer_id = customer_id;
  req.meta.control.barrier_group = node_group;
  req.meta.timestamp = van_->GetTimestamp();
  CHECK_GT(van_->Send(req), 0);
  barrier_cond_.wait(ulk, [this, customer_id] {
      return barrier_done_[0][customer_id];
    });
}

可以看到该方法会首先对barrier_mu_上锁,之后将对应的barrier_done_设置为false。然后将这次的barrier信息发送给scheduler。告诉scheduler需要进行一次barrier。然后就阻塞等待barrier_done_被设置为true,代表完成了barrier,也就是其他的worker也都进行了barrier。

那么问题就变成了,每个worker都是怎么直到其他worker也进行了barrier的?

首先我们要知道,在参数服务器也就是PS中,每个进程都会建立kvstore。如果是worker,会在构造函数中运行如下代码:

if (IsWorkerNode()) {
      int new_customer_id = GetNewCustomerId();
      ps_worker_ = new ps::KVWorker<char>(0, new_customer_id);
      ps::StartAsync(new_customer_id, "mxnet\0");
      if (!ps::Postoffice::Get()->is_recovery()) {
        ps::Postoffice::Get()->Barrier(
          new_customer_id,
          ps::kWorkerGroup + ps::kServerGroup + ps::kScheduler);
      }
    }

其中ps::StartAsync如下:

inline void StartAsync(int customer_id, const char* argv0 = nullptr) {
  Postoffice::Get()->Start(customer_id, argv0, false);
}

也就是说,worker在建立起ps_worker_后,开始运行postoffice,而postoffice的Start会进行一系列的操作,并调用van_->Start,接着vanStart会进行一系列的初始化后,开启接受消息的线程,也就是

receiver_thread_ = std::unique_ptr<std::thread>(
            new std::thread(&Van::Receiving, this));

receiving函数会使用ProcessBarrierCommand处理barrier信号,该函数会++barrier_count_[group],也就是将对应group的barrier次数进行统计。当barrier_count_[group]等于这个group的个数的时候。它会发送类似于ACK的返回信息。

然后worker会调用Manage方法来处理该message。Manage发现是barrier的返回信息,将barrier_done_设置为true,然后将等待的线程唤醒。也就是python端调用barrier后被阻塞的地方。

至此,就完成了一次worker之间的barrier。

©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

  • iOS多线程编程 基本知识 1. 进程(process) 进程是指在系统中正在运行的一个应用程序,就是一段程序的执...
    陵无山阅读 6,364评论 1 14
  • 写在前面的话 代码中的# > 表示的是输出结果 输入 使用input()函数 用法 注意input函数输出的均是字...
    FlyingLittlePG阅读 3,237评论 0 9
  • feisky云计算、虚拟化与Linux技术笔记posts - 1014, comments - 298, trac...
    不排版阅读 4,383评论 0 5
  • 一. 操作系统概念 操作系统位于底层硬件与应用软件之间的一层.工作方式: 向下管理硬件,向上提供接口.操作系统进行...
    月亮是我踢弯得阅读 6,188评论 3 28
  • 一、Python简介和环境搭建以及pip的安装 4课时实验课主要内容 【Python简介】: Python 是一个...
    _小老虎_阅读 6,356评论 0 10

友情链接更多精彩内容