论文原文:Scaling Distributed Machine Learning with the Parameter Server
参考:知乎Parameter Server 详解
简书不支持latex,建议直接去博客看neinei的小栈
Task
构建一个可扩展、容错性好并且对用户友好的机器学习系统。
Motivation
1.(机器学习的特点)
机器学习系统相比于其他系统而言,有一些自己的独特特点。例如:
- 迭代性:模型的更新并非一次完成,需要循环迭代多次;
- 容错性:即使在每个循环中产生一些错误,模型最终仍能收敛;
- 参数收敛的非均匀性:有些参数几轮迭代就会收敛,而有的参数却需要上百轮迭代。
而且工业界需要训练大型的机器学习模型,一些广泛应用的特定的模型在规模上有两个特点:
- 参数很大,超过单个机器的容纳的能力(大型LR和神经网络)
- 训练数据太大,需要并行提速(大数据)
在机器学习和深度学习领域,分布式的优化已经成了一种先决条件。因为单机已经解决不了目前快速增长的数据和参数了。
而往往这些模型的参数需要被所有的worker节点频繁的访问,这就会带来很多问题和挑战:
2.(大数据的特点)
访问这些巨量的参数,需要大量的网络带宽支持;
很多机器学习算法都是连续型的,只有上一次迭代完成(各个worker都完成)之后,才能进行下一次迭代,这就导致了如果机器之间性能差距大(木桶理论),就会造成性能的极大损失;
-
在分布式中,容错能力是非常重要的。很多情况下,算法都是部署到云环境中的(这种环境下,机器是不可靠的,并且job也是有可能被抢占的);
为了说明最后一点,我们从一家大型互联网公司的一个集群中收集了三个月的所有工作日志。我们在表1中显示了服务于生产环境的批量机器学习任务的统计信息。这里,任务失败主要是由于在没有必要的容错机制的情况下被抢占或丢失机器所致。与许多研究中的工作完全在群集上运行而没有争用的情况不同,容错是现实世界部署中的必要条件。
因此在上述需求背景下,类似MapReduce的框架就不能满足需求了。
设计一个上述系统时,我们需要解决很多问题。类似频繁访问修改模型参数所需要消耗的巨大带宽,以及如何提高并行度,减少同步等待造成的延迟等等。而参数服务器即为解决这种需求提出的。参数服务器是一个编程框架,用于方便分布式并行程序的编写,其中重点在于对大规模参数的分布式存储和协同的支持。ParameterServer适用于大规模深度学习系统,大规模Logistic Regression系统,大规模主题模型,大规模矩阵分解等依赖SGD或L-BFGS最优化的算法。
BackGround
1.releated work
对于机器学习分布式优化,有很多大公司在做了,包括:Amazon,Baidu,Facebook,Google,Microsoft 和 Yahoo。也有一些开源的项目,比如:YahooLDA 和 Petuum 和Graphlab。 总结一下:
本篇论文提出的ParameterServer属于第三代的parameter server。
参数服务器的概念最早来自于Alex Smola于2010年提出的并行LDA的框架。它通过采用一个分布式的Memcached作为存放参数的存储,这样就提供了有效的机制用于分布式系统中不同的Worker之间同步模型参数,而每个Worker只需要保存他计算时所以来的一小部分参数即可。第一代 parameter server:缺少灵活性和性能 —— 仅使用memcached(key, value) 键值对存储作为同步机制。 YahooLDA 通过改进这个机制,增加了一个专门的服务器,提供用户能够自定义的更新操作(set, get, update)
后来由Google的Jeff Dean进一步提出了第一代Google大脑的解决方案:DistBelief。DistBelief将巨大的深度学习模型分布存储在全局的参数服务器中,计算节点通过参数服务器进行信息传递,很好地解决了SGD和L-BFGS算法的分布式训练问题。第二代parameter server用bounded delay模型来改进YahooLDA,但是却进一步限制了worker线程模型。
在后来就是李沐所在的DMLC组所设计的参数服务器。根据论文中所写,该parameter server属于第三代参数服务器,就是提供了更加通用的设计。架构上包括一个Server Group和若干个Worker Group。 能够解决上述问题。
首先来比较一下parameter server 跟通用的分布式系统之间的差别吧。
通用的分布式系统通常都是:每次迭代都强制同步,通常在几十个节点上,它们的性能可以表现的很好,但是在大规模集群中,这样的每次迭代强制同步的机制会因为木桶效应变得很慢。
Mahout 基于 Hadoop,MLI 基于 Spark,它们(Spark与MLI)采用的都是 Iterative MapReduce 的架构。它们能够保持迭代之间的状态,并且执行策略也更加优化了。但是,由于这两种方法都采用同步迭代的通信方式,使得它们很容易因为个别机器的低性能导致全局性能的降低。
直观感受一下,当其中一个节点运行时间过长会发生什么:
为了解决这个问题,Graphlab 采用图形抽象的方式进行异步调度通信。但是它缺少了以 MapReduce 为基础架构的弹性扩展性,并且它使用粗粒度的snapshots来进行恢复,这两点都会阻碍到可扩展性。parameter server 正是吸取Graphlab异步机制的优势,并且解决了其在可扩展性方面的劣势。
看看异步迭代是如何提高性能的:
2.Parameter Server的优势
- Efficient Communication:高效的通信。网络通信开销是机器学习分布式系统中的大头,因此parameter server基本尽了所有的努力来降低网络开销。
其中最重要的一点就是:异步通信。因为是异步通信,所以不需要停下来等一些慢的机器执行完一个iter,这就大大减少了延迟。 - Flexible consistency models:宽松的一致性要求进一步减少了同步的成本和延时。并非所有算法都天然的支持异步和随机性,有的算法引入异步后可能收敛会变慢,因此parameter server 允许算法设计者根据自身的情况来做算法收敛速度和系统性能之间的权衡trade-off。
- Elastic Scalability:使用一致性哈希算法,分布式的hash表,使得新的Server可以随时动态插入集合中,无需重新运行系统。
- Fault Tolerance and Durability:
节点故障是不可避免的。在本文的PS中,对于server节点来说,使用链备份来应对;而对于Worker来说,因为worker之间互相不通信,因此在某个worker失败后,新的worker可以直接加入。Vector clocks 保证了经历故障之后还是能运行良好; - Ease of Use:全局共享的参数可以被表示成各种形式:vector, matrices或是sparse类型,同时框架还提供对线性代数类型提供高性能的多线程计算库。
3.Parameter Server系统架构
在parameter server中,每个 server 实际上都只负责分到的部分参数(servers共同维持一个全局的共享参数),而每个 work 也只分到部分数据和处理任务;
servers 与 workers 之间的通信如下:
上图的resource manager可以先放一放,因为实际系统中这部分往往是复用现有的资源管理系统,比如yarn或者mesos;底下的training data毋庸置疑的需要类似GFS的分布式文件系统的支持;剩下的部分就是参数服务器的核心组件了。
图中画了一个server group和三个worker group;实际应用中往往也是类似,server group用一个,而worker group按需配置;server manager是server group中的管理节点,一般不会有什么逻辑,只有当有server node加入或退出的时候,为了维持一致性哈希而做一些调整。
Worker group中的task schedule则是一个简单的任务协调器,一个具体任务运行的时候,task schedule负责通知每个worker加载自己对应的数据,然后去server node上拉取一个要更新的参数分片,用本地数据样本计算参数分片对应的变化量,然后同步给server node;server node在收到本机负责的参数分片对应的所有worker的更新后,对参数分片做一次update。
server节点可以跟其他 server 节点通信,每个server负责自己分到的参数,server group 共同维持所有参数的更新。
server manager node 负责维护一些元数据的一致性,比如各个节点的状态,参数的分配情况等;
worker 节点之间没有通信,只跟自己对应的server进行通信。每个worker group有一个task scheduler,负责向worker分配任务,并且监控worker的运行情况。当有新的worker加入或者退出,task scheduler 负责重新分配任务。
下图中算法1是最直接的算法,流程如图所示,从参数服务器上获取参数,计算梯度,把参数在发给参数服务器更新。
算法3则是优化后的算法,具体的优化点下面会讲到。
下面一一分析PS特点:
3.1 (Key,Value) Vectors
parameter server 中,参数都是可以被表示成(key, value)的集合,比如一个最小化损失函数的问题,key 就是 feature ID,而 value 就是它的权值;对于LDA,key就是单词ID和文档ID的组合,value就是词语的计数对于稀疏参数,不存在的key,就可以认为是0。
把参数表示成 k-v, 形式更自然,易于理解,更易于编程解;
workers 跟 servers 之间通过 push 跟 pull 来通信。worker 通过 push 将计算好的梯度发送到 server,然后通过 pull 从 server 更新参数。为了提高计算性能和带宽效率,parameter server 允许用户使用 Range Push 跟 Range Pull 操作(使用区间更新的方式)。
3.2 Range Push and Pull
在节点间数据传输采用Push、Pull操作。
在Algorithm 1,每个工作节点Push它全部的本地梯度到参数服务器,然后Pull更新的参数回来。
在algorithm 3,每次传输一个范围的keys。
-
假设 R 是需要push或pull的 key 的range,那么可以进行如下操作:
w.push(R, dest) w.pull(R, dest)
参数服务器支持range-based push and pull。 设R为一个key的范围,w.push(R,dest)发送在R范围内所有存在的w到目的地(可以是一个节点,一个节点群如服务器群)。w.pull(R,dest)从目的地读取在$R$范围内所有存在的w。
如果我们把R设置为全部key范围,则全部参数向量w会被通信,若我们把R设置为单个key,则只有独立的entry会被发送。这个接口可以扩展到通信任何分享相同keys作为w的本地数据结构。
3.3 User-Defined Functions on the Server
在算法1中,服务器端更新参数的时候还要计算正则化项。这样的操作可以由用户自定义指定。同时,这样的自定义函数还可以扩展到和参数同key的数据上,比如梯度g,可以将梯度g作为自定义函数的参数穿进去更新参数,这样,服务器端也不用存梯度变量了,节省了内存。
3.4 Asynchronous Tasks and Dependency
参数服务器和工作节点之间的通信都属于远程调用,那么,远程调用是比较耗时的行为,如果每次都保持同步的话,那么训练相对于单节点来说是减慢了许多的,因为远程调用的耗时。因而,PS框架让远程调用成为一部调用,比如参数的push和pull发出之后,立即使用当前值开始进行下一步的梯度计算,如上图,迭代11发出push和pull的请求后,立马开始进行梯度计算,而此时,使用的还是迭代10的值。
这其实是一种折衷,失去了模型的一致性,提升了速度。
如果 iter1 需要在 iter0 computation,push 跟 pull 都完成后才能开始,那么就是Synchronous,反之就是Asynchronous.
Asynchronous Task:能够提高系统的效率(因为节省了很多等待的过程),但是,它的缺点就是容易降低算法的收敛速率;
所以,系统性能跟算法收敛速率之间是存在一个trade-off的,你需要同时考虑:
- 算法对于参数非一致性的敏感度;
- 训练数据特征之间的关联度;
- 硬盘的存储容量;
3.5 Flexible Consistency
考虑到用户使用的时候会有不同的情况,parameter server 为用户提供了多种任务依赖方式:
Sequential: 这里其实是 synchronous task,序列式的,即完全同步的.任务之间是有顺序的,只有上一个任务完成,才能开始下一个任务;
Eventual: 跟 sequential 相反,所有任务之间没有顺序,各自独立完成自己的任务,
Bounded Delay: 这是sequential 跟 eventual 之间的trade-off,可以设置一个 τ 作为最大的延时时间。也就是说,只有 >τ 之前的任务都被完成了,才能开始一个新的任务;极端的情况:
τ=0,情况就是 Sequential;
τ=∞,情况就是 Eventual;
上面👆algorithm 3就是bounded delay 的 PGD (proximal gradient descent)算法的系统运行流程:
注意: τ 不是越大越好的,具体要取多大,得看具体情况,贴一张李少帅做的实验(Ad click prediction):
6. User-Defined Filters(用户自定义过滤)
在工作节点这一端对梯度进行过滤,如果梯度并不是影响那么大,就不占用网络去更新,等积累一段时间之后再去做更新。
对于机器学习优化问题比如梯度下降来说,并不是每次计算的梯度对于最终优化都是有价值的,用户可以通过自定义的规则过滤一些不必要的传送,再进一步压缩带宽 cost:
- 发送很小的梯度值是低效的:
因此可以自定义设置,只在梯度值较大的时候发送;
- 更新接近最优情况的值是低效的:
因此,只在非最优的情况下发送,可通过KKT来判断;
Attention:parameter server的异步性与非凸问题
参数服务器模型更新的时候,worker的模型参数与server的模型参数可能有所不一致。
举例而言,梯度计算需要基于某个特定的参数值(相当于下山,我们只能找到针对特定某点的下山最快的方向,一旦该点变化,则下山最快的方向也就不对了)。问题在于:节点A从server获得参数值后,当计算完梯度后,此时server端的参数值可能已经被节点B所更新了。
但是在非凸问题(例如深度学习的优化)中,这反而是个好事,引入了随机性。这是因为非凸问题本身就不是梯度下降能够解决的,正常的单机迭代肯定会收敛到局部最优。有时我们常常会用一些额外的方法来跳出局部最优:
- 多组参数值初始化
- 模拟退火
- 随机梯度下降
而上面所说的PS框架正好利用异步性引入了随机性,有助于跳出局部最优。因此在Google的DistBelief框架中,提出了Downpour SGD算法,就是尽最大可能利用了这种随机性。
4.Implementation
在PS的实现层面,又有很多的细节。这些细节有很多都是利用系统的特性想出来的。
servers 使用 一致性hash 来存储参数k-v键值对。
用链式复制方式来提高系统容错性。
不同于先前的键值对系统,参数服务器充分利用以范围为基础的通信,压缩数据和基于向量时钟的范围。
4.1 Vector Clock
考虑到复杂任务的依赖图以及快速恢复的需要,使参数服务器中的每个键值对与一个向量时钟对应,向量时钟记录了在这个键值对上的每个独立节点的时间,用来跟踪参数的更新和防止重复发送数据。基于此,通信中的梯度更新数据中也应该有时间戳,防止重复更新。
向量时钟在追踪聚合状态或是阻止重复发送数据等方面是方便的,但对于n个节点、m个参数,向量时钟需要$O(nm)$的空间,对于上千个节点以及数十亿个参数,存储和带宽是不可能的满足的。但是,参数服务器基于范围的通信模式,很多参数都有相同的时间戳:如果节点在一个范围内push参数,那么这些与这个节点相关的参数的时间戳几乎是一样的。因此,它们可以被压缩到单个范围的向量时钟中。更特别地,假设$vc_i(k)$是节点$i$在key $k$的时间,考虑key的范围为$R$,则范围向量时钟$vc_i(R)=t$表示对于任意key $k\in R$,$vc_i(k)=t$。开始时,每个节点$i$只有一个范围向量时钟,从初始时间戳覆以及范围0开始覆盖了全部的参数key空间。每个范围设置可能分开(split)范围并且创造至多3个新的向量时钟(见Algorithm 2,在刚开始的时候,所有的参数都是一个大向量,时间戳为0,每次来一个范围的更新,如果能找到对应的key,那么直接更新那个key的时间戳就可以了。否则,就可能会对某些向量进行切分,好在,来一次更新请求,最多能把一个区间切分为三个区间)。这样对于$k$个独立范围、m个节点数,至多$O(mk)$个向量时钟。
4.2 Messages
一条 message 包括:时间戳,len(range)对k-v.
这是parameter server 中最基本的通信格式,不仅仅是共享的参数才有,task 的message也是这样的格式,只要把这里的(key, value) 改成 (task ID, 参数/返回值)。
由于机器学习问题通常都需要很高的网络带宽,因此信息的压缩是必须的。
key的压缩:因为训练数据通常在分配之后都不会发生改变,因此worker没有必要每次都发送相同的key,只需要接收方在第一次接收的时候缓存起来就行了。第二次,worker不再需要同时发送key和value,只需要发送value 和 key list的hash就行。这样瞬间减少了一般的通信量。
value的压缩: 假设参数时稀疏的,那么就会有大量的0存在。因此,为了进一步压缩,我们只需要发送非0值。parameter server使用 Snappy compression library快速压缩库来压缩数据、高效去除0值。
**key **的压缩和 **value **的压缩可以同时进行。
用户自定义过滤:
对于机器学习优化问题比如梯度下降来说,并不是每次计算的梯度对于最终优化都是有价值的,用户可以通过自定义的规则过滤一些不必要的传送,再进一步压缩带宽cost:
-
发送很小的梯度值是低效的:
因此可以自定义设置,只在梯度值较大的时候发送; -
更新接近最优情况的值是低效的:
因此,只在非最优的情况下发送,可通过KKT来判断;
4.3 Consistent Hashing
PS在数据一致性方面,使用的是一致性哈希算法,然后每个节点备份其逆时针的k个节点的参数。
一致性哈希算法:即将数据按照某种hash算法映射到环上,然后将机器按照同样的hash算法映射到环上,将数据存储到环上顺时针最近的机器上。
如图,k=3,则S1复制了S2,S3和S4的key。)
有两种方式保证主节点与备份节点之间的数据一致性。
4.4 Replication and Consistency
- 默认的复制方式:Chain Replication链备份(强一致性)
即worker0 push的更新x,在server0经过处理后变成了f(x)。只有在f(x)备份到了server1后,这次的push才算结束,woker0才会收到ack。
该备份方式对于一些需要频繁更新参数的算法,可能造成难以承受的网络带宽开销。(相当于把网络带宽乘以k倍,k是备份的个数),因此parameter server框架也支持如下方法。
2.Replication after Aggregation:先聚合多个worker节点的更新,再备份先聚合多个worker节点的更新,然后再备份。只有数据被备份了之后,worker0和worker1才会收到ack。
这种方法会造成worker收到的ack的时间被推迟,但是在宽松的一致性条件下(即woker无需等到ack就可以继续下一轮迭代)却无关紧要。
4.5 Server Management
添加新server:
- server manager给新节点分配key
- range。这个会导致其他server的key range更改新节点获取key range的data,并且将其备份到备份节点去
- server manager广播节点的更改。
删除server:
server maneger通过心跳信号确定一个server死亡,然后就会该server的key range分配给多个节点。
4.6 Worker Management
添加新woker:
- task scheduler给新woker分配数据
- 该新worker从别的wokers或是文件系统重新读取训练数据,然后从servers处pull下参数
- task scheduler广播该变化,有可能造成其他worker释放部分train data
删除woker:
删除worker通常是直接不管该节点,这是因为丢失一部分训练数据通常并不会影响模型训练结果,但是恢复一个worker的话可能会占用较多资源。
当然用户也可以选择用新worker来替代丢失的worker。