1.GPU设置
2.分布式策略
1.GPU设置
(1)GPU设置API列表
tf.debugging.set_log_device_placement
:打印某个变量分配在哪个设备上的信息
tf.config.experimental.set_visible_devices
:设置本进程可见的设备,即该进程可使用的设备
tf.config.experimental.list_logical_devices
:获取逻辑设备
tf.config.experimental.list_physical_devices
:获取物理设备的列表
tf.config.experimental.set_memory_growth
:内存自增,用多少占多少
tf.config.experimental.VirtualDeviceConfiguration
:建立逻辑分区
tf.config.set_soft_device_placement
:自动将某个计算分配到某个设备,而不是手动指定
(2)内存增长及虚拟设备
由于在进行分布式计算时,需要调整batch_size,所以输入数据集应该为dataset,mode.fit_generate不支持分布式训练。重新设置内存增长及虚拟设备时,需要重启kernel,不能回头再次运行
tf.debugging.set_log_device_placement(True)
gpus = tf.config.experimental.list_physical_devices('GPU')
#设置进程可见gpu
tf.config.experimental.set_visible_devices(gpus[0],'GPU')
#对gpu设置内存自增
for gpu in gpus:
tf.config.experimental.set_memory_growth(gpu,True)
print(len(gpus))
logical_gpu = tf.config.experimental.list_logical_devices('GPU')
print(len(logical_gpu))
(3)GPU逻辑切分
tf.debugging.set_log_device_placement(True)
gpus = tf.config.experimental.list_physical_devices('GPU')
#设置进程可见gpu
tf.config.experimental.set_visible_devices(gpus[0],'GPU')
#对设置虚拟GPU,将physical gpu分割为几个logical gpu
tf.config.experimental.set_virtual_device_configuration(
gpus[0],
[tf.config.experimental.VirtualDeviceConfiguration(memory_limit=1024),
tf.config.experimental.VirtualDeviceConfiguration(memory_limit=1024)])
print(len(gpus))
logical_gpu = tf.config.experimental.list_logical_devices('GPU')
print(len(logical_gpu))
(4)手动设置将模型计算分布到不同GPU
tf.debugging.set_log_device_placement(True)
gpus = tf.config.experimental.list_physical_devices('GPU')
tf.config.experimental.set_virtual_device_configuration(
gpus[0],
[tf.config.experimental.VirtualDeviceConfiguration(memory_limit=1024),
tf.config.experimental.VirtualDeviceConfiguration(memory_limit=1024)])
print(len(gpus))
logical_gpu = tf.config.experimental.list_logical_devices('GPU')
print(len(logical_gpu))
with tf.device(logical_gpu[0].name):
model = keras.models.Sequential()
#卷积层
model.add(keras.layers.Conv2D(filters = 32,kernel_size = 3,
padding = 'same',
activation = 'relu',
input_shape = (28,28,1)))
model.add(keras.layers.Conv2D(filters = 32,kernel_size = 3,
padding = 'same',
activation = 'relu'))
#池化层
model.add(keras.layers.MaxPool2D(pool_size = 2))
#卷积层
model.add(keras.layers.Conv2D(filters = 64,kernel_size = 3,
padding = 'same',
activation = 'relu'))
model.add(keras.layers.Conv2D(filters = 64,kernel_size = 3,
padding = 'same',
activation = 'relu'))
with tf.device(logical_gpu[1].name):
#池化层
model.add(keras.layers.MaxPool2D(pool_size = 2))
#卷积层
model.add(keras.layers.Conv2D(filters = 128,kernel_size = 3,
padding = 'same',
activation = 'relu'))
model.add(keras.layers.Conv2D(filters = 128,kernel_size = 3,
padding = 'same',
activation = 'relu'))
#池化层
model.add(keras.layers.MaxPool2D(pool_size = 2))
#全连接层
model.add(keras.layers.Flatten())
model.add(keras.layers.Dense(128,activation = 'relu'))
model.add(keras.layers.Dense(10,activation='softmax'))
model.compile(loss = "sparse_categorical_crossentropy",
optimizer = "adam",
metrics = ["accuracy"])
(5)自动设置GPU
tf.debugging.set_log_device_placement(True)
#自动设置GPU
tf.config.set_soft_device_placement(True)
gpus = tf.config.experimental.list_physical_devices('GPU')
#对gpu设置内存自增
for gpu in gpus:
tf.config.experimental.set_memory_growth(gpu,True)
print(len(gpus))
logical_gpu = tf.config.experimental.list_logical_devices('GPU')
print(len(logical_gpu))
2.分布式策略
为什么需要分布式?
·数据量大
·模型太复杂,参数多,计算量大
tensorflow支持的分布式策略:
MirroredStrategy
:
①同步式分布式训练
②适用于一机多卡情况
③每个GPU都有网络结构的所有参数,这些参数会被同步
④数据并行:
Batch数据切分为N份分给各个GPU
梯度聚合后更新给各个GPU上的参数
CentralStorageStrategy
:
①MirroredStrategy的变种
②参数不是存储在每个GPU上,而是存储在一个设备上(CPU或者唯一的一个GPU)
③计算(除了更新参数的计算)是在所有GPU上并行的
MultiworkerMirroredStrategy
:
①类似于MirroredStrategy
②适用于多机多卡的情况
TPUStrategy
:
①类似于MirroredStrategy
②适用于TPU
ParameterServerStrategy
:
①异步分布式
②更加适用于大规模分布式系统
③机器分为P安然么特人 Server 和worker两类
(1)同步式分布策略
(2)异步分布策略
(3)keras实现分布式训练
strategy = tf.distribute.MirroredStrategy()
with strategy.scope():
model = keras.models.Sequential(pass)
model.compile(pass)
(4)estimator实现分布式训练
strategy = tf.distribute.MirroredStrategy()
config = tf.estimator.RunConfig(train_distribute = strategy)
estimator = keras.estimator.model_to_estimator(model,config = config)
(5)自定义训练流程分布式
dataset
def make_dataset(image,labels,epochs,batch_size,shuffle = True):
dataset = tf.data.Dataset.from_tensor_slices((image,labels))
if shuffle:
dataset = dataset.shuffle(10000)
dataset = dataset.repeat(epochs).batch(batch_size).prefetch(50)
return dataset
strategy = tf.distribute.MirroredStrategy()
with strategy.scope():
batch_size_per_replica = 128
batch_size = batch_size_per_replica * len(logical_gpu) #每个gpu 128
train_dataset = make_dataset(x_train_scaled,y_train,1,batch_size)
valid_dataset = make_dataset(x_valid_scaled,y_valid,1,batch_size)
train_dataset_distribute = strategy.experimental_distribute_dataset(train_dataset)
valid_dataset_distribute = strategy.experimental_distribute_dataset(valid_dataset)
模型构建
with strategy.scope():
model = keras.models.Sequential()
#卷积层
model.add(keras.layers.Conv2D(filters = 32,kernel_size = 3,
padding = 'same',
activation = 'relu',
input_shape = (28,28,1)))
model.add(keras.layers.Conv2D(filters = 32,kernel_size = 3,
padding = 'same',
activation = 'relu'))
#池化层
model.add(keras.layers.MaxPool2D(pool_size = 2))
#卷积层
model.add(keras.layers.Conv2D(filters = 64,kernel_size = 3,
padding = 'same',
activation = 'relu'))
model.add(keras.layers.Conv2D(filters = 64,kernel_size = 3,
padding = 'same',
activation = 'relu'))
#池化层
model.add(keras.layers.MaxPool2D(pool_size = 2))
#卷积层
model.add(keras.layers.Conv2D(filters = 128,kernel_size = 3,
padding = 'same',
activation = 'relu'))
model.add(keras.layers.Conv2D(filters = 128,kernel_size = 3,
padding = 'same',
activation = 'relu'))
#池化层
model.add(keras.layers.MaxPool2D(pool_size = 2))
#全连接层
model.add(keras.layers.Flatten())
model.add(keras.layers.Dense(128,activation = 'relu'))
model.add(keras.layers.Dense(10,activation='softmax'))
模型训练
with strategy.scope():
loss_func = keras.losses.SparseCategoricalCrossentropy(
reduction = keras.losses.Reduction.NONE)#非分布式计算时设置SUM_OVER_BATCH_SIZE
def compute_loss(labels,predictions):
per_replica_loss = loss_func(labels,predictions)
return tf.nn.compute_average_loss(per_replica_loss,
global_batch_size = batch_size)
#define totalize
test_loss = keras.metrics.Mean(name = "test_loss")
train_accuracy = keras.metrics.SparseCategoricalAccuracy(name = "train_accuracy")
test_accuracy = keras.metrics.SparseCategoricalAccuracy(name = "test_accuracy")
opt = keras.optimizers.SGD(1e-3)
def train_step(inputs):
images,labels = inputs
with tf.GradientTape() as tape:
predictions = model(images,training = True)
loss = compute_loss(labels,predictions)
gradients = tape.gradient(loss,model.trainable_variables)
opt.apply_gradients(zip(gradients,model.trainable_variables))
train_accuracy.update_state(labels,predictions)
return loss
@tf.function
def distributed_train_step(inputs):
per_replica_average_loss = strategy.experimental_run_v2(train_step,args=(inputs,))
return strategy.reduce(tf.distribute.ReduceOp.SUM,
per_replica_average_loss,
axis = None)
def test_step(inputs):
images,labels = inputs
predictions = model(images)
t_loss = loss_func(labels,predictions)
test_loss.update_state(t_loss)
test_accuracy.update_state(labels,predictions)
@tf.function
def distributed_test_step(inputs):
strategy.experimental_run_v2(test_step,args=(inputs,))
epochs = 10
for epoch in range(epochs):
total_loss = 0.0
num_batches = 0
for x in train_dataset:
start_time = time.time()
total_loss += distributed_train_step(x)
run_time = time.time() - start_time
num_batches += 1
print('\rtotal_loss: %3.3f, num_batches: %d,average_loss: %3.3f, time: %3.3f'
%(total_loss,num_batches,total_loss/num_batches,run_time),end="")
train_loss = total_loss / num_batches
for x in valid_dataset:
distributed_test_step(x)
print('\rEpoch: %d, Loss: %3.3f,Acc: %3.3f, Val_Loss: %3.3f, Val_Acc: %3.3f'
%(epoch + 1,train_loss,train_accuracy.result(),test_loss.result(),test_accuracy.result()))
test_loss.reset_states()
train_accuracy.reset_states()
test_accuracy.reset_states()
非分布式自定义训练
loss_func = keras.losses.SparseCategoricalCrossentropy(
reduction = keras.losses.Reduction.SUM_OVER_BATCH_SIZE)
#define totalize
test_loss = keras.metrics.Mean(name = "test_loss")
train_accuracy = keras.metrics.SparseCategoricalAccuracy(name = "train_accuracy")
test_accuracy = keras.metrics.SparseCategoricalAccuracy(name = "test_accuracy")
opt = keras.optimizers.SGD(1e-3)
@tf.function
def train_step(inputs):
images,labels = inputs
with tf.GradientTape() as tape:
predictions = model(images,training = True)
loss = loss_func(labels,predictions)
gradients = tape.gradient(loss,model.trainable_variables)
opt.apply_gradients(zip(gradients,model.trainable_variables))
train_accuracy.update_state(labels,predictions)
return loss
@tf.function
def test_step(inputs):
images,labels = inputs
predictions = model(images)
t_loss = loss_func(labels,predictions)
test_loss.update_state(t_loss)
test_accuracy.update_state(labels,predictions)
epochs = 10
for epoch in range(epochs):
total_loss = 0.0
num_batches = 0
for x in train_dataset:
start_time = time.time()
total_loss += train_step(x)
run_time = time.time() - start_time
num_batches += 1
print('\rtotal_loss: %3.3f, num_batches: %d,average_loss: %3.3f, time: %3.3f'
%(total_loss,num_batches,total_loss/num_batches,run_time),end="")
train_loss = total_loss / num_batches
for x in valid_dataset:
test_step(x)
print('\rEpoch: %d, Loss: %3.3f,Acc: %3.3f, Val_Loss: %3.3f, Val_Acc: %3.3f'
%(epoch + 1,train_loss,train_accuracy.result(),test_loss.result(),test_accuracy.result()))
test_loss.reset_states()
train_accuracy.reset_states()
test_accuracy.reset_states()