深度学习分布式训练实战(二)——TF

本篇博客主要介绍TF的分布式训练,重点从代码层面进行讲解。理论部分可以参考深度学习分布式训练实战(一)

TF的分布式实现方式

TF的分布式有两种实现方式,一种是图内分布式(In-graph replication);一种是图间分布式(Between-graph replication)。图内分布式中,计算图只有一个,需要一个中心节点分配计算任务并更新参数,由于中心节点的存在,中心节点容易成为瓶颈。图间分布式中,计算图有多个,但是不同计算图的相同变量通过tf.train.replica_device_setter函数放到同一个服务器上,这种情况下,各个计算图相互独立(参数只有一份,计算图有多个),并行度更高,适合异步更新,同步更新下相对麻烦,不过TF给了接口tf.train.SyncReplicasOptimizer函数来帮助实现参数的同步更新,所以图间分布式应用相对广泛一些。
关于数据并行,模型并行可以参考深度学习分布式训练实战(一)

大部分情况下,我们使用图间分布式,图内分布式一般只会在模型太大的情况下使用。对于图间分布式,其基于gRPC通信框架,模型参数只有一份,计算图有多份,一个master负责创建主session,多个worker执行计算图任务。模型训练过程中,每个计算图计算出各自梯度,然后对参数进行更新。更新方式有两种:同步更新,异步更新。

分布式TF中,TF需要建立一个集群,然后在集群中建立两个job,一个是ps job,负责参数初始化,参数更新,一个job下面可以有多个task(有多个task,说明有多台机器,或者GPU负责参数初始化,更新)。一个是woker job,负责计算图的运算,计算梯度,一个worker job下面也可以有很多个task(有多个task,说明有多台机器,或者GPU负责运行计算图)。

参数异步更新的分布式训练

参数同步更新基本上和这里写的差不多TensorFlow分布式部署
。只不过为了方便在本机上调试,所以改了一点点。(自己的笔记本没有GPU),介绍下面几个重点的语句:
tf.train.ClusterSpec():创建一个集群对象
tf.train.Server():在这个集群上面创建一个服务器,根据实际情况,可以是参数服务器,也可以是计算服务器
tf.train.Supervisor():创建一个监视器,就是用来监控训练过程的,个人感觉主要就是方便恢复模型训练,其logdir参数为训练日志目录,如果里面有模型,则直接恢复训练。所以如果想重新训练,需要删除这个目录。
sv.managed_session():启动Session,相比于其他启动Session的方式,多了一些功能。可以参考TensorFlow 中三种启动图用法

具体代码如下:

# tensorflow distribute train by asynchronously update 

import tensorflow as tf
import numpy as np

tf.app.flags.DEFINE_string("ps_hosts", "", "Comma-separated list of hostname:port pairs")
tf.app.flags.DEFINE_string("worker_hosts", "", "Comma-separated list of hostname:port pairs")
tf.app.flags.DEFINE_string("job_name", "", "one of ps or worker")
tf.app.flags.DEFINE_integer("task_index", 0, "0, 1, 2...")

FLAGS = tf.app.flags.FLAGS

def main(_):
    ps_hosts = FLAGS.ps_hosts.split(",")
    worker_hosts = FLAGS.worker_hosts.split(",")

    # Create a cluster from the parameter server and worker server
    cluster = tf.train.ClusterSpec({"ps":ps_hosts, "worker":worker_hosts})

    # Create and start a server for the local task
    server = tf.train.Server(cluster, job_name = FLAGS.job_name, task_index=FLAGS.task_index)
    # 如果是参数服务器,则直接阻塞,等待计算服务器下达参数初始化,参数更新命令就可以了。
    # 不过“下达命令”这个是TF内部实现的,没有显式实现
    if FLAGS.job_name == "ps":
        server.join() 
    elif FLAGS.job_name == "worker":
        # Assigns ops to the local worker by default
        with tf.device(tf.train.replica_device_setter(worker_device="/job:worker/task:%d" % FLAGS.task_index, cluster=cluster)):
            train_X = np.linspace(-1.0, 1.0, 100)
            train_Y = 2.0 * train_X + np.random.randn(*train_X.shape) * 0.33 + 10.0
            X = tf.placeholder("float")
            Y = tf.placeholder("float")

            w = tf.Variable(0.0, name="weight")
            b = tf.Variable(0.0, name="bias")
            loss = tf.square(Y - tf.multiply(X, w) - b)

            global_step = tf.Variable(0)
            train_op = tf.train.AdagradOptimizer(0.01).minimize(loss, global_step=global_step)

            saver = tf.train.Saver()
            summary_op = tf.summary.merge_all()

            init_op = tf.global_variables_initializer()

            # Create a "supervisor", which oversees the training process.
            sv = tf.train.Supervisor(is_chief=(FLAGS.task_index==0),
                logdir="~/Downloads/log/",
                init_op=init_op,
                summary_op = summary_op,
                saver=saver,
                global_step=global_step,
                save_model_secs=600)

            # The supervisor takes care of session initialization, retoring from a
            # checkpoint, and closing when done or an error occurs.
            with sv.managed_session(server.target) as sess:
                step = 0
                while step < 1000000:
                    # Run a training step asynchronously
                    for (x, y) in zip(train_X, train_Y):
                        _, step =sess.run([train_op, global_step], feed_dict={X:x, Y:y})
                    loss_value = sess.run(loss, feed_dict={X:x, Y:y})
                    print("Step: {}, loss: {}".format(step, loss_value))

            # Ask for all the services to stop
            sv.stop()

if __name__=="__main__":
    tf.app.run()

打开三个终端,分别输入以下三个命令,就可以看到训练结果了:

CUDA_VISIBLE_DEVICES='' python AsynDis.py --ps_hosts=localhost:2222 --worker_hosts=localhost:2224,localhost:2225 --job_name=ps --task_index=0
CUDA_VISIBLE_DEVICES='' python AsynDis.py --ps_hosts=localhost:2222 --worker_hosts=localhost:2224,localhost:2225 --job_name=worker --task_index=0
CUDA_VISIBLE_DEVICES='' python AsynDis.py --ps_hosts=localhost:2222 --worker_hosts=localhost:2224,localhost:2225 --job_name=worker --task_index=1

这里有一点要注意,控制计算图使用哪一块GPU是通过命令行设置CUDA_VISIBLE_DEVICES来实现的,而不是写死在代码里面的。
此外,还有一点不方便的地方,如果有很多台机器,则需要把多份这份代码拷贝多次,在每台机器上分别运行上述命令才可以,还是不太方便的。

参数同步更新的分布式训练

同步更新稍微麻烦了点,需要加几行代码(重点参考了《Tensorflow实战》一书),改动部分已经标明,代码如下:

# tensorflow distribute train by synchronously update 

import tensorflow as tf
import numpy as np

tf.app.flags.DEFINE_string("ps_hosts", "", "Comma-separated list of hostname:port pairs")
tf.app.flags.DEFINE_string("worker_hosts", "", "Comma-separated list of hostname:port pairs")
tf.app.flags.DEFINE_string("job_name", "", "one of ps or worker")
tf.app.flags.DEFINE_integer("task_index", 0, "0, 1, 2...")

FLAGS = tf.app.flags.FLAGS
def main(_):
    ps_hosts = FLAGS.ps_hosts.split(",")
    worker_hosts = FLAGS.worker_hosts.split(",")
    n_works = len(worker_hosts)
    # Create a cluster from the parameter server and worker server
    cluster = tf.train.ClusterSpec({"ps":ps_hosts, "worker":worker_hosts})

    # Create and start a server for the local task
    server = tf.train.Server(cluster, job_name = FLAGS.job_name, task_index=FLAGS.task_index)

    if FLAGS.job_name == "ps":
        server.join()
    elif FLAGS.job_name == "worker":
        # Assigns ops to the local worker by default
        with tf.device(tf.train.replica_device_setter(worker_device="/job:worker/task:%d" % FLAGS.task_index, cluster=cluster)):
            train_X = np.linspace(-1.0, 1.0, 100)
            train_Y = 2.0 * train_X + np.random.randn(*train_X.shape) * 0.33 + 10.0
            X = tf.placeholder("float")
            Y = tf.placeholder("float")

            w = tf.Variable(0.0, name="weight")
            b = tf.Variable(0.0, name="bias")
            loss = tf.square(Y - tf.multiply(X, w) - b)

            global_step = tf.Variable(0)

            # for Syncmously updata
            # 同步更新模式下,需要等待所有计算图计算出梯度,然后梯度求平均,tf.train.SyncReplicasOptimizer实现了这种封装
            opt = tf.train.SyncReplicasOptimizer(
                tf.train.AdagradOptimizer(0.01),
                replicas_to_aggregate=n_works,
                total_num_replicas=n_works,
                )
            train_op = opt.minimize(loss, global_step=global_step)
            saver = tf.train.Saver()
            summary_op = tf.summary.merge_all()

            init_op = tf.global_variables_initializer()

            # for Syncmously updata
            # 同步模式下,主计算服务器需要协调不同计算服务器计算得到的梯度,并更新参数。
            if FLAGS.task_index==0:
                # 定义协调不同计算服务器的队列,并定义初始化操作
                chief_queue_runner = opt.get_chief_queue_runner()
                init_tokens_op = opt.get_init_tokens_op(0)

            # Create a "supervisor", which oversees the training process.
            sv = tf.train.Supervisor(is_chief=(FLAGS.task_index==0),
                logdir="~/Downloads/log/",
                init_op=init_op,
                summary_op = summary_op,
                saver=saver,
                global_step=global_step,
                save_model_secs=600)



            # The supervisor takes care of session initialization, retoring from a
            # checkpoint, and closing when done or an error occurs.

            # for Syncmously updata. 
            # prepare_or_wait_for_session used by sync. It will wait until main node ok and parameter init over!
            # for Syncmously updata. 
            # 这里用的是prepare_or_wait_for_session。
            # 相比于异步更新的managed_session:只要某个计算服务器参数初始化完毕就可以开始,
            # prepare_or_wait_for_session:等待所有计算服务器参数初始化完毕(参数只有一份,后续的计算服务器应该不需要初始化了?只需要和参数服务器建立一个关系?),主节点协调工作完毕后,开始。
            with sv.prepare_or_wait_for_session(server.target) as sess:
                # for Syncmously updata
                if FLAGS.task_index==0:
                    # 开始训练之前,主计算服务器需要启动协调同步更新的队列,并执行初始化操作
                    sv.start_queue_runners(sess, [chief_queue_runner])
                    sess.run(init_tokens_op)

                step = 0
                while step < 100000:
                    # Run a training step asynchronously
                    for (x, y) in zip(train_X, train_Y):
                        _, step =sess.run([train_op, global_step], feed_dict={X:x, Y:y})
                    loss_value = sess.run(loss, feed_dict={X:x, Y:y})
                    print("Step: {}, loss: {}".format(step, loss_value))

            # Ask for all the services to stop
            sv.stop()

if __name__=="__main__":
    tf.app.run()
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,284评论 6 506
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,115评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,614评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,671评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,699评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,562评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,309评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,223评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,668评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,859评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,981评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,705评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,310评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,904评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,023评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,146评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,933评论 2 355

推荐阅读更多精彩内容