2017年,Facebook在ResNet50上采用了分布式训练,开启了分布式深度学习的的大规模应用时代。通过使用分布在32个服务器上的256个GPU进行训练,大大提升了模型训练的速度和精度。在面对海量数据和复杂模型时,计算资源很容易称为深度学习的瓶颈,分布式学习的应用则很好地解决了这一难题。在推荐广告领域,算法工程师面对的往往是数以亿计的大规模稀疏数据,分布式学习就显得尤为重要。作为工业界的宠儿(雾),tensorflow提供了一整套分布式训练的代码框架,许多公司都会直接采用或进行二次开发。本文简要介绍了分布式训练的基本知识,仅做入门参考。
1. 参数服务器(Parameter Server, PS)架构
在推荐系统领域,PS是目前业界分布式TF训练最常用的结构。
分布式TF中所有设备被总称为一个cluster(集群),每个cluster会包含若干server,每个server会执行一个task,因此server和task是一一对应的,一个cluster可以看作由server或task组成。
此外,TF又将一系列相似的task集合称为一个job,因此cluster也可以看做job的集合。在PS架构中有两种job:ps和worker,其中ps负责存储网络参数和更新梯度,worker负责读入训练数据并计算梯度。
图中的训练流程分为5步:
(1). worker读取训练数据
(2). 在worker内经过一次前向传播并计算梯度
(3). 将计算好的梯度上传至PS
(4). 在PS内更新网络的权重
(5). 将更新后的参数重新传到worker上.
2. 参数的更新方式:同步更新和异步更新
同步更新是指所有的worker将梯度计算好之后,在PS中进行合并,一次性更新网络的参数。这种方式能够保证模型的参数是同时更新的,保证了训练的稳定性。但在实际应用中会出现worker计算速度不同,导致整个训练被个别worker拖慢的情况。
异步更新则是每个worker将计算好的梯度异步传给PS,每次PS只更新改梯度对应的网络参数。这种方式训练速度更快,但由于参数是分批更新的,会影响到算法的收敛速度。
在实际应用中可以采取折中的方案,即在异步更新的基础上设置一个延迟阈值,当有worker延迟超过该阈值时,就强制所有worker停下等待其他worker迭代完成。
3. 分布式TF代码示例
3.1 使用tf.train.ClusterSpec来创建一个cluster:
cluster = tf.train.ClusterSpec({
"worker": [
"worker0.example.com:2222",
"worker1.example.com:2222",
"worker2.example.com:2222"
],
"ps": [
"ps0.example.com:2222",
"ps1.example.com:2222"
]})
在TF中,job由job name(string)来标识,而task用task index(int)来标识,则cluster中的每个task 就可以用job name + task index来唯一标识。(这里的的job name即PS和worker,task index即各自的编号,如PS0, PS1,worker0,worker1...)
上述cluster中包含2种job:ps和worker,以及5个task:
/job:worker/task:0
/job:worker/task:1
/job:worker/task:2
/job:ps/task:0
/job:ps/task:1
3.2 使用tf.train.Server创建各个task的server
如创建上述第一个worker的server:
server = tf.train.Server(cluster, job_name="worker", task_index=0)
3.3 使用tf.device调用cluster中的各个server
创建完server后,现在需要开始构建TF的graph。这里使用tf.device指定cluster中的各个server,来完成graph的创建。
with tf.device("/job:ps/task:0"):
weights_1 = tf.Variable(...)
biases_1 = tf.Variable(...)
with tf.device("/job:ps/task:1"):
weights_2 = tf.Variable(...)
biases_2 = tf.Variable(...)
3.4 创建Session来执行计算图
with tf.Session("grpc://worker7.example.com:2222") as sess:
for _ in range(10000):
sess.run(train_op)
这里注意需要在tf.Session()中指定target参数。
grpc是Google rpc的缩写,是google开源的rpc框架,分布式tensorflow底层通信的方式。
4. Replicated Training
Replicated training指分布式TF中的数据并行策略,即多个worker使用不同的mini-batch来训练,然后更新ps上存放的参数。在了解Replicated training之前,首先要了解client的概念。
Client
在前述的创建cluster和server步骤中,搭建了TF训练所需要的分布式环境。当真正进行训练时,需要创建一个client程序, 来创建TF的计算图并建立一个session来与cluster中的设备进行交互。
目前主流的replicated Training做法是使用Between-graph replication方式,这种模式下的参数放在ps上,每个worker包含一个client,它们构建相同的Graph。TF中的tf.train.replica_device_setter
函数专门用来方便Graph的构建。
#创建cluster
cluster_spec = {
"ps": ["ps0:2222", "ps1:2222"],
"worker": ["worker0:2222", "worker1:2222", "worker2:2222"]}
cluster = tf.train.ClusterSpec(cluster_spec)
with tf.device(tf.train.replica_device_setter(worker_device="/job:worker/task:%d"%FLAGS.task_index, cluster=cluster)):
#构建计算图
v1 = tf.variable(...)
v2 = tf.variable(...)
v3 = tf.variable(...)
使用tf.train.replica_device_setter
可以自动将Graph中的参数放在ps上,并将Graph的计算部分放在当前worker上。
在使用Between-graph replication进行训练时,一般会指定一个chief worker,协调各个worker之间的训练,并完成模型初始化,模型checkpoint保存和恢复等公共操作。这里可以使用tf.train.MonitoredTrainingSession
来创建client 的Session,并指定哪个worker为chief worker。
5. 总结
分布式训练可以做到数据并行和训练并行。数据并行指DNN的网络参数分片地存储在不同的ps上,训练并行是指不同的worker独立计算梯度,并更新PS中的网络参数。分布式训练可以大大提高复杂神经网络的训练效率,也是推荐、广告等算法工程师的必备技能。
参考资料