TensorFlow分布式部署

分布式TensorFlow简介

参数服务器

当计算模型越来越大,模型的参数越来越多,多到模型参数的更新,一台机器的性能都不够时,我们需要将参数分开到不同的机器去存储和更新。

参数服务器可以是多台机器组成的集群,类似于分布式的存储结构。主要用来解决参数存储和更新的性能问题。

in-graph模式

in-graph模式下数据分发在一个节点上。

这种方式配置简单,其他结算节点只需join操作,暴露一个网络接口,等在那里接受任务就好。

但坏处就是训练数据的分发在一个节点上,要把训练数据分到不同的机器上,严重影响了并发的训练速度。

between-graph模式

between-graph模式下,训练的参数保存在参数服务器,数据不用分发,数据分片的保存在各个计算节点,各个计算节点自己算自己的,算完后把要更新的参数告诉参数服务器,参数服务器更新参数。

这种模式的优点是不用进行训练数据的分发,尤其数据量在TB级的时候,节省了大量的时间,所以大数据深度学习推荐使用between-graph模式。

同步更新和异步更新

in-graph和between-graph模式都支持同步更新和异步更新。

在同步更新的时候,每次梯度更新,要等所有分发的数据计算完成,返回结果,把梯度累加算了均值之后,再更新参数。这样的好处是loss的下降比较稳定,但这个的坏处也比较明显,处理的速度取决于最慢的那个分片的计算时间。

在异步更新时,所有的计算节点,自己算自己的,更新参数也是自己更新自己的计算结果,这样的优点是计算速度快,计算资源能得到充分利用,但是缺点是loss的下降不稳定,抖动大。

在数据量小的情况下,各个节点的计算能力比较均衡的情况下,推荐使用同步模式;数据量很大,各个机器的计算性能参差不齐的情况下,推荐使用异步的方式。

例子

在上一章中,我们在RHEL7.1上搭建了TensorFlow 0.12的环境,为了验证分布式的效果,我们按照上一章的步骤再搭建一台虚拟机。

两台虚拟机的ip分别为

192.168.139.128  
192.168.139.130

功能说明

代码实现的功能:对于表达式

Y = 2 * X + 10

其中,X是输入,Y是输出,现在有很多X和Y的样本,怎么估算出来weight是2和biasis是10.所有的节点,不管是ps节点还是worker节点,运行的都是同一份代码,只是命令参数指定不一样。

执行命令

在这里我们将192.168.139.130虚拟机当作参数服务器和worker1服务器,将192.168.139.128虚拟机当作worker2服务器。

ps节点执行

CUDA_VISIBLE_DEVICES='' python dis_1.py --ps_hosts=192.168.139.130:2222 --worker_hosts=192.168.139.130:2224,192.168.139.128:2225 --job_name=ps --task_index=0

worker1节点执行

CUDA_VISIBLE_DEVICES='0' python dis_1.py --ps_hosts=192.168.139.130:2222 --worker_hosts=192.168.139.130:2224,192.168.139.128:2225 --job_name=worker --task_index=0

worker2节点执行

CUDA_VISIBLE_DEVICES='1' python dis_1.py --ps_hosts=192.168.139.130:2222 --worker_hosts=192.168.139.130:2224,192.168.139.128:2225 --job_name=worker --task_index=1

==坑1==

在一开始运行时,worker1节点运行的很好,但worker2节点始终处于类似连接失败并一直在尝试重连的情况,经过排查后发现是防火墙的问题。所以我们在运行此例时需要关闭防火墙

systemctl status firewalld  查看防火墙状态
systemctl stop firewalld  重启后会重新开启
systemctl disable firewalld  禁用

==坑2==
在例子没跑完时,我关闭了terminal,重新开启一个terminal,运行上述命令,发现worker2依旧不能运行,

ps -aux |grep 2225
kill xxx

查看了端口使用情况,发现2225端口被刚才关闭的命令和刚才运行的命令同时占用,所以我们需要kill到前者的进程。

代码解释

# Flags for defining the tf.train.ClusterSpec
tf.app.flags.DEFINE_string("ps_hosts", "",
                           "Comma-separated list of hostname:port pairs")
tf.app.flags.DEFINE_string("worker_hosts", "",
                           "Comma-separated list of hostname:port pairs")

# Flags for defining the tf.train.Server
tf.app.flags.DEFINE_string("job_name", "", "One of 'ps', 'worker'")
tf.app.flags.DEFINE_integer("task_index", 0, "Index of task within the job")

FLAGS = tf.app.flags.FLAGS

代码说明:通过命令行参数可以传入ps节点的ip和端口, worker节点的ip和端口。ps节点就是paramter server的缩写, 主要是保存和更新参数的节点, worker节点主要是负责计算的节点。这里说的节点都是虚拟的节点,不一定是物理上的节点;多个节点用逗号分隔

ps_hosts = FLAGS.ps_hosts.split(",")
worker_hosts = FLAGS.worker_hosts.split(",")

# Create a cluster from the parameter server and worker hosts.
cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts})

# Create and start a server for the local task.
server = tf.train.Server(cluster,
                         job_name=FLAGS.job_name,
                         task_index=FLAGS.task_index)

if FLAGS.job_name == "ps":
    server.join()
elif FLAGS.job_name == "worker":

    # Assigns ops to the local worker by default.
    with tf.device(tf.train.replica_device_setter(
            worker_device="/job:worker/task:%d" % FLAGS.task_index,
            cluster=cluster)):

代码说明:

  • ClusterSpec的定义,需要把你要跑这个任务所有的ps和worker的节点的ip和端口信息都包含进去,所有的节点都要执行这段代码,大家就互相知道了,这个集群里都有哪些成员,不同成员的类型是什么,是ps节点还是worker节点
  • tf.train.Server定义开始,每个节点就不一样了。根据执行的命令参数不同,决定了这个任务是哪个任务。如果任务名字是ps的话,程序就join到这里,作为参数更新的服务,等待其他worker节点给他提交参数更新的数据。如果是worker任务,就继续执行后面的计算任务。
  • replica_device_setter,根据TensorFlow的文档对这个的解释,在这个with语句之下定义的参数,会自动分配到参数服务器上去定义,如果有多个参数服务器,就轮流循环分配。
train_X = np.linspace(-1.0, 1.0, 100)
train_Y = 2.0 * train_X + np.random.randn(*train_X.shape) * 0.33 + 10.0

X = tf.placeholder("float")
Y = tf.placeholder("float")

w = tf.Variable(0.0, name="weight")
b = tf.Variable(0.0, name="bias")
loss = tf.square(Y - tf.mul(X, w) - b)

global_step = tf.Variable(0)

train_op = tf.train.AdagradOptimizer(0.01).minimize(
    loss, global_step=global_step)

saver = tf.train.Saver()
summary_op = tf.summary.merge_all()
init_op = tf.global_variables_initializer()

定义计算逻辑

# Create a "supervisor", which oversees the training process.
sv = tf.train.Supervisor(is_chief=(FLAGS.task_index == 0),
                         logdir="/tmp/train_logs",
                         init_op=init_op,
                         summary_op=summary_op,
                         saver=saver,
                         global_step=global_step,
                         save_model_secs=600)

# The supervisor takes care of session initialization, restoring from
# a checkpoint, and closing when done or an error occurs.
with sv.managed_session(server.target) as sess:
    # Loop until the supervisor shuts down or 1000000 steps have completed.
    step = 0
    while not sv.should_stop() and step < 1000000:
        # Run a training step asynchronously.
        # See `tf.train.SyncReplicasOptimizer` for additional details on how to
        # perform *synchronous* training.
        for (x, y) in zip(train_X, train_Y):
            _, step = sess.run([train_op, global_step],
                               feed_dict={X: x,
                                          Y: y})

        loss_value = sess.run(loss, feed_dict={X: x, Y: y})
        print("Step: {}, loss: {}".format(step, loss_value))

# Ask for all the services to stop.
sv.stop()

代码说明:

  • Supervisor,类似于一个监督者,因为分布式了,很多机器都在运行,像参数初始化、保存模型、写summary,这个supervisor帮你一起弄起来了,就不用自己手动去做这些事情了,而且在分布式的环境下涉及到各种参数的共享,其中的过程自己手工写也不好写,于是TensorFlow就给大家包装好这么一个东西。这里的参数is_chief比较重要。
    在所有的计算节点里还是有一个主节点的,这个主节点来负责初始化参数,模型的保存,summary的保存。logdir就是保存和装载模型的路径。不过这个似乎启动后会去这个logdir的目录去看有没有checkpoint的文件,有的话就自动装载了,没用就用init_op指定的初始化参数,好像没有参数指定不让它自动load的
  • 主worker节点负责模型参数初始化等工作,在这个过程中,其他worker节点等待主节点完成初始化工作,等主节点初始化完成后,就可以跑数据了。
  • 这里的global_step的值,是可以所有计算节点共享的,在执行optimizer的minimize的时候,会自动+1, 虽有可以通过这个可以知道所有的计算节点一共计算了多少步了。

完整代码

#!/usr/bin/env python

import tensorflow as tf
import numpy as np

# Flags for defining the tf.train.ClusterSpec
tf.app.flags.DEFINE_string("ps_hosts", "",
                           "Comma-separated list of hostname:port pairs")
tf.app.flags.DEFINE_string("worker_hosts", "",
                           "Comma-separated list of hostname:port pairs")

# Flags for defining the tf.train.Server
tf.app.flags.DEFINE_string("job_name", "", "One of 'ps', 'worker'")
tf.app.flags.DEFINE_integer("task_index", 0, "Index of task within the job")

FLAGS = tf.app.flags.FLAGS


def main(_):
    ps_hosts = FLAGS.ps_hosts.split(",")
    worker_hosts = FLAGS.worker_hosts.split(",")

    # Create a cluster from the parameter server and worker hosts.
    cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts})

    # Create and start a server for the local task.
    server = tf.train.Server(cluster,
                             job_name=FLAGS.job_name,
                             task_index=FLAGS.task_index)

    if FLAGS.job_name == "ps":
        server.join()
    elif FLAGS.job_name == "worker":

        train_X = np.linspace(-1.0, 1.0, 100)
        train_Y = 2.0 * train_X + np.random.randn(*train_X.shape) * 0.33 + 10.0

        X = tf.placeholder("float")
        Y = tf.placeholder("float")

        # Assigns ops to the local worker by default.
        with tf.device(tf.train.replica_device_setter(
                worker_device="/job:worker/task:%d" % FLAGS.task_index,
                cluster=cluster)):

            w = tf.Variable(0.0, name="weight")
            b = tf.Variable(0.0, name="bias")
            loss = tf.square(Y - tf.mul(X, w) - b)

            global_step = tf.Variable(0)

            train_op = tf.train.AdagradOptimizer(0.01).minimize(
                loss, global_step=global_step)

            saver = tf.train.Saver()
            summary_op = tf.summary.merge_all()
            init_op = tf.global_variables_initializer()

        # Create a "supervisor", which oversees the training process.
        sv = tf.train.Supervisor(is_chief=(FLAGS.task_index == 0),
                                 logdir="/tmp/train_logs",
                                 init_op=init_op,
                                 summary_op=summary_op,
                                 saver=saver,
                                 global_step=global_step,
                                 save_model_secs=600)

        # The supervisor takes care of session initialization, restoring from
        # a checkpoint, and closing when done or an error occurs.
        with sv.managed_session(server.target) as sess:
            # Loop until the supervisor shuts down or 1000000 steps have completed.
            step = 0
            while not sv.should_stop() and step < 1000000:
                # Run a training step asynchronously.
                # See `tf.train.SyncReplicasOptimizer` for additional details on how to
                # perform *synchronous* training.
                for (x, y) in zip(train_X, train_Y):
                    _, step = sess.run([train_op, global_step],
                                       feed_dict={X: x,
                                                  Y: y})

                loss_value = sess.run(loss, feed_dict={X: x, Y: y})
                print("Step: {}, loss: {}".format(step, loss_value))

        # Ask for all the services to stop.
        sv.stop()


if __name__ == "__main__":
    tf.app.run()

本文参考:
白话tensorflow分布式部署和开发

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,029评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,395评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,570评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,535评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,650评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,850评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,006评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,747评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,207评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,536评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,683评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,342评论 4 330
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,964评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,772评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,004评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,401评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,566评论 2 349

推荐阅读更多精彩内容

  • 1. Tensorflow 实现原理 实现原理 TensorFlow有一个重要组件client,顾名思义,就是客户...
    iccccing阅读 8,177评论 0 5
  • 儿时的傻事,大多都淡忘了。唯有那件事,至今还历历在目,每一想起也禁不住扑哧一笑。 那年我三岁,邻居王伯伯抱着...
    爱柠檬的女孩_阅读 265评论 2 1
  • 什么是GCD 全称是Grand Central Dispatch纯C语言,提供了非常多强大的函数GCD会自动管理线...
    仰天风吹雪阅读 290评论 0 0
  • 心灵感应是一种能够读取到远距对方思想的能力,是一种不必靠语言传达的沟通能力。在神秘主义者看来,它是一种超能力;...
    Joanne_fa80阅读 1,964评论 0 2