完整demo直接见最后
代码首先是用estimator来写的,看官网上用MirroredStrategy来进行多gpu训练,基本不用改原生代码,只是加了个train_distribute,但是实际过程真是各种坑
首先看下官网给的例子:distribute
def model_fn(features, labels, mode):
layer = tf.layers.Dense(1)
logits = layer(features)
if mode == tf.estimator.ModeKeys.PREDICT:
predictions = {"logits": logits}
return tf.estimator.EstimatorSpec(mode, predictions=predictions)
loss = tf.losses.mean_squared_error(
labels=labels, predictions=tf.reshape(logits, []))
if mode == tf.estimator.ModeKeys.EVAL:
return tf.estimator.EstimatorSpec(mode, loss=loss)
if mode == tf.estimator.ModeKeys.TRAIN:
train_op = tf.train.GradientDescentOptimizer(0.2).minimize(loss)
return tf.estimator.EstimatorSpec(mode, loss=loss, train_op=train_op)
def input_fn():
features = tf.data.Dataset.from_tensors([[1.]]).repeat(100)
labels = tf.data.Dataset.from_tensors(1.).repeat(100)
return tf.data.Dataset.zip((features, labels))
distribution = tf.contrib.distribute.MirroredStrategy()
config = tf.estimator.RunConfig(train_distribute=distribution)
classifier = tf.estimator.Estimator(model_fn=model_fn, config=config)
classifier.train(input_fn=input_fn)
classifier.evaluate(input_fn=input_fn)
很简单是不,基本没啥玩意,也能够跑的起来,但是问题来了,跑完发现,一是看不到日志,二是只保存了一个ckpt,另外看input_fn也跟平时训练的数据不一样,没有batch啊,平时训练来按batch来更新的啊,那就再改下,为匹配数据,把model_fn也改下,改完如下,主要是加了个global_step,以及变了下数据维度
def model_fn(features, labels, mode, params=None):
logits = tf.layers.dense(features, 1)
if mode == tf.estimator.ModeKeys.PREDICT:
predictions = {"logits": logits}
return tf.estimator.EstimatorSpec(mode=mode, predictions=predictions)
loss = tf.losses.mean_squared_error(labels=labels, predictions=logits) # shape
if mode == tf.estimator.ModeKeys.EVAL:
return tf.estimator.EstimatorSpec(mode=mode, loss=loss)
if mode == tf.estimator.ModeKeys.TRAIN:
global_step = tf.train.get_or_create_global_step() # global_step
train_op = tf.train.GradientDescentOptimizer(0.2).minimize(loss, global_step=global_step)
return tf.estimator.EstimatorSpec(mode=mode, loss=loss, train_op=train_op)
def input_fn_build():
x = np.random.rand(1000, 10).astype(np.float32)
y = np.array([np.random.randint(0, 1) for _ in range(1000)], dtype=np.float32).reshape(-1, 1)
def input_fn():
dataset = tf.data.Dataset.from_tensor_slices((x, y)) # 注意这里用的是 from_tensor_slices
dataset = dataset.repeat(1000).batch(64).prefetch(2)
iterator = dataset.make_one_shot_iterator()
features, labels = iterator.get_next()
return features, labels
return input_fn
ok,改完了,那让咱跑下吧,好家伙,出错了,还好,有报错信息
看意思是返回的数据有问题,那改成跟上面一样呗
return features, labels
=====>
return tf.data.Dataset.zip((features, labels))
完了,又出错了
真tm尴尬,zip的输入还是不对,对tf的数据类型又不太熟悉,又把前面的信息看了下,上网找了找,还真找到了,https://www.coder.work/article/2011280,意思是 tf.data
与分布策略(可与keras和tf.Estimator
s一起使用)一起使用时,输入fn应返回tf.data.Dataset
,这看明白了,直接把dataset返回得了
def input_fn():
dataset = tf.data.Dataset.from_tensor_slices((x, y)) # 注意这里用的是 from_tensor_slices
dataset = dataset.repeat(1000).batch(64).prefetch(2)
return dataset
这下可以了,能正常跑了,终于跑通了多卡,但是,还是没有日志,直接就保存了模型,又看了看代码,原来是没加日志显示等级,tf.logging.set_verbosity(tf.logging.INFO),加了下,终于可以正常跑了,完整代码如下:
import shutil
import numpy as np
import tensorflow as tf
tf.logging.set_verbosity(tf.logging.INFO)
def model_fn(features, labels, mode, params=None):
logits = tf.layers.dense(features, 1)
if mode == tf.estimator.ModeKeys.PREDICT:
predictions = {"logits": logits}
return tf.estimator.EstimatorSpec(mode=mode, predictions=predictions)
loss = tf.losses.mean_squared_error(labels=labels, predictions=logits)
if mode == tf.estimator.ModeKeys.EVAL:
return tf.estimator.EstimatorSpec(mode=mode, loss=loss)
if mode == tf.estimator.ModeKeys.TRAIN:
global_step = tf.train.get_or_create_global_step()
train_op = tf.train.GradientDescentOptimizer(0.2).minimize(loss, global_step=global_step)
return tf.estimator.EstimatorSpec(mode=mode, loss=loss, train_op=train_op)
def input_fn_build():
x = np.random.rand(1000, 10).astype(np.float32)
y = np.array([np.random.randint(0, 1) for _ in range(1000)], dtype=np.float32).reshape(-1, 1)
def input_fn():
dataset = tf.data.Dataset.from_tensor_slices((x, y))
dataset = dataset.repeat(1000).batch(64).prefetch(2)
return dataset
return input_fn
if __name__ == '__main__':
distribution = tf.contrib.distribute.MirroredStrategy()
config = tf.estimator.RunConfig(
save_checkpoints_secs=60*10,
keep_checkpoint_max=2,
train_distribute=distribution
)
ddir = './logs/temp'
shutil.rmtree(ddir, ignore_errors=True)
classifier = tf.estimator.Estimator(model_fn=model_fn, config=config, model_dir=ddir)
classifier.train(input_fn=input_fn_build())
classifier.evaluate(input_fn=input_fn_build())
内心欢喜,本以为可以了,oh, too young too simple,一比较发现,这两个卡还不如一个卡跑的快,实际数据,一个卡利用率可到96%左右,两个反而变成了三四十,gg。。。
应当是数据传输的时候影响效率,后面再接着研究。。。