In [ ]:
#@title Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
本教程演示了如何使用 tf.distribute.Strategy
来进行自定义训练循环。 我们将在流行的 MNIST 数据集上训练一个简单的 CNN 模型。 流行的 MNIST 数据集包含了 60000 张尺寸为 28 x 28 的训练图像和 10000 张尺寸为 28 x 28 的测试图像。
我们用自定义训练循环来训练我们的模型是因为它们在训练的过程中为我们提供了灵活性和在训练过程中更好的控制。而且,使它们调试模型和训练循环的时候更容易。
In [ ]:
# 导入 TensorFlow
import tensorflow as tf
# 帮助库
import numpy as np
import os
print(tf.__version__)
In [ ]:
fashion_mnist = tf.keras.datasets.fashion_mnist
(train_images, train_labels), (test_images, test_labels) = fashion_mnist.load_data()
# 向数组添加维度 -> 新的维度 == (28, 28, 1)
# 我们这样做是因为我们模型中的第一层是卷积层
# 而且它需要一个四维的输入 (批大小, 高, 宽, 通道).
# 批大小维度稍后将添加。
train_images = train_images[..., None]
test_images = test_images[..., None]
# 获取[0,1]范围内的图像。
train_images = train_images / np.float32(255)
test_images = test_images / np.float32(255)
tf.distribute.MirroredStrategy
策略是如何运作的?
注意:您可以将下面的所有代码放在一个单独单元内。 我们将它分成几个代码单元用于说明目的。
In [ ]:
# 如果设备未在 `tf.distribute.MirroredStrategy` 的指定列表中,它会被自动检测到。
strategy = tf.distribute.MirroredStrategy()
In [ ]:
print ('Number of devices: {}'.format(strategy.num_replicas_in_sync))
将图形和变量导出成平台不可识别的 SavedModel 格式。在你的模型保存后,你可以在有或没有范围的情况下载入它。
In [ ]:
BUFFER_SIZE = len(train_images)
BATCH_SIZE_PER_REPLICA = 64
GLOBAL_BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync
EPOCHS = 10
创建数据集并分发它们:
In [ ]:
train_dataset = tf.data.Dataset.from_tensor_slices((train_images, train_labels)).shuffle(BUFFER_SIZE).batch(GLOBAL_BATCH_SIZE)
test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(GLOBAL_BATCH_SIZE)
train_dist_dataset = strategy.experimental_distribute_dataset(train_dataset)
test_dist_dataset = strategy.experimental_distribute_dataset(test_dataset)
In [ ]:
def create_model():
model = tf.keras.Sequential([
tf.keras.layers.Conv2D(32, 3, activation='relu'),
tf.keras.layers.MaxPooling2D(),
tf.keras.layers.Conv2D(64, 3, activation='relu'),
tf.keras.layers.MaxPooling2D(),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(64, activation='relu'),
tf.keras.layers.Dense(10, activation='softmax')
])
return model
In [ ]:
# 创建检查点目录以存储检查点。
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")
通常,在一台只有一个 GPU / CPU 的机器上,损失需要除去输入批量中的示例数。
那么,使用 tf.distribute.Strategy
时应该如何计算损失?
举一个例子,假设您有4个 GPU,批量大小为64. 输入的一个批次分布在各个副本(4个 GPU)上,每个副本获得的输入大小为16。
每个副本上的模型使用其各自的输入执行正向传递并计算损失。 现在,相较于将损耗除以其各自输入中的示例数(BATCH_SIZE_PER_REPLICA = 16),应将损失除以GLOBAL_BATCH_SIZE(64)。
为什么这样做?
如何在TensorFlow中执行此操作?
如果您正在编写自定义训练循环,如本教程中所示,您应该将每个示例损失相加并将总和除以 GLOBAL_BATCH_SIZE :
scale_loss = tf.reduce_sum(loss) * (1. / GLOBAL_BATCH_SIZE)
或者你可以使用tf.nn.compute_average_loss
来获取每个示例的损失,可选的样本权重,将GLOBAL_BATCH_SIZE作为参数,并返回缩放的损失。
如果您在模型中使用正则化损失,则需要进行缩放多个副本的损失。 您可以使用tf.nn.scale_regularization_loss
函数执行此操作。
建议不要使用tf.reduce_mean
。 这样做会将损失除以实际的每个副本中每一步都会改变的批次大小。
这种缩小和缩放是在 keras中 modelcompile
和model.fit
中自动完成的
如果使用tf.keras.losses
类(如下面这个例子所示),则需要将损失减少明确指定为“NONE”或者“SUM”。 使用 tf.distribute.Strategy
时,AUTO
和SUM_OVER_BATCH_SIZE
是不能使用的。 不能使用 AUTO
是因为用户应明确考虑到在分布式情况下他们想做的哪些减少是正确的。不能使用SUM_OVER_BATCH_SIZE
是因为目前它只按每个副本批次大小进行划分,并按照用户的副本数进行划分,这导致了它们很容易丢失。 因此,我们要求用户要明确这些减少。
In [ ]:
with strategy.scope():
# 将减少设置为“无”,以便我们可以在之后进行这个减少并除以全局批量大小。
loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
reduction=tf.keras.losses.Reduction.NONE)
# 或者使用 loss_fn = tf.keras.losses.sparse_categorical_crossentropy
def compute_loss(labels, predictions):
per_example_loss = loss_object(labels, predictions)
return tf.nn.compute_average_loss(per_example_loss, global_batch_size=GLOBAL_BATCH_SIZE)
In [ ]:
with strategy.scope():
test_loss = tf.keras.metrics.Mean(name='test_loss')
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
name='train_accuracy')
test_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
name='test_accuracy')
In [ ]:
# 必须在`strategy.scope`下创建模型和优化器。
with strategy.scope():
model = create_model()
optimizer = tf.keras.optimizers.Adam()
checkpoint = tf.train.Checkpoint(optimizer=optimizer, model=model)
In [ ]:
with strategy.scope():
def train_step(inputs):
images, labels = inputs
with tf.GradientTape() as tape:
predictions = model(images, training=True)
loss = compute_loss(labels, predictions)
gradients = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
train_accuracy.update_state(labels, predictions)
return loss
def test_step(inputs):
images, labels = inputs
predictions = model(images, training=False)
t_loss = loss_object(labels, predictions)
test_loss.update_state(t_loss)
test_accuracy.update_state(labels, predictions)
In [ ]:
with strategy.scope():
# `experimental_run_v2`将复制提供的计算并使用分布式输入运行它。
@tf.function
def distributed_train_step(dataset_inputs):
per_replica_losses = strategy.experimental_run_v2(train_step,
args=(dataset_inputs,))
return strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses,
axis=None)
@tf.function
def distributed_test_step(dataset_inputs):
return strategy.experimental_run_v2(test_step, args=(dataset_inputs,))
for epoch in range(EPOCHS):
# 训练循环
total_loss = 0.0
num_batches = 0
for x in train_dist_dataset:
total_loss += distributed_train_step(x)
num_batches += 1
train_loss = total_loss / num_batches
# 测试循环
for x in test_dist_dataset:
distributed_test_step(x)
if epoch % 2 == 0:
checkpoint.save(checkpoint_prefix)
template = ("Epoch {}, Loss: {}, Accuracy: {}, Test Loss: {}, "
"Test Accuracy: {}")
print (template.format(epoch+1, train_loss,
train_accuracy.result()*100, test_loss.result(),
test_accuracy.result()*100))
test_loss.reset_states()
train_accuracy.reset_states()
test_accuracy.reset_states()
以上示例中需要注意的事项:
for x in ...
迭代构造train_dist_dataset
和test_dist_dataset
。distributed_train_step
的返回值。 这个值会在各个副本使用tf.distribute.Strategy.reduce
的时候合并,然后通过tf.distribute.Strategy.reduce
叠加各个返回值来跨批次。tf.distribute.Strategy.experimental_run_v2
时,tf.keras.Metrics
应在train_step
和test_step
中更新。tf.distribute.Strategy.experimental_run_v2
返回策略中每个本地副本的结果,并且有多种方法可以处理此结果。 您可以执行tf.distribute.Strategy.reduce
来获取汇总值。 您还可以执行tf.distribute.Strategy.experimental_local_results
来获取每个本地副本中结果中包含的值列表。一个模型使用了tf.distribute.Strategy
的检查点可以使用策略或者不使用策略进行恢复。
In [ ]:
eval_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
name='eval_accuracy')
new_model = create_model()
new_optimizer = tf.keras.optimizers.Adam()
test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(GLOBAL_BATCH_SIZE)
In [ ]:
@tf.function
def eval_step(images, labels):
predictions = new_model(images, training=False)
eval_accuracy(labels, predictions)
In [ ]:
checkpoint = tf.train.Checkpoint(optimizer=new_optimizer, model=new_model)
checkpoint.restore(tf.train.latest_checkpoint(checkpoint_dir))
for images, labels in test_dataset:
eval_step(images, labels)
print ('Accuracy after restoring the saved model without strategy: {}'.format(
eval_accuracy.result()*100))
In [ ]:
with strategy.scope():
for _ in range(EPOCHS):
total_loss = 0.0
num_batches = 0
train_iter = iter(train_dist_dataset)
for _ in range(10):
total_loss += distributed_train_step(next(train_iter))
num_batches += 1
average_train_loss = total_loss / num_batches
template = ("Epoch {}, Loss: {}, Accuracy: {}")
print (template.format(epoch+1, average_train_loss, train_accuracy.result()*100))
train_accuracy.reset_states()
In [ ]:
with strategy.scope():
@tf.function
def distributed_train_epoch(dataset):
total_loss = 0.0
num_batches = 0
for x in dataset:
per_replica_losses = strategy.experimental_run_v2(train_step,
args=(x,))
total_loss += strategy.reduce(
tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)
num_batches += 1
return total_loss / tf.cast(num_batches, dtype=tf.float32)
for epoch in range(EPOCHS):
train_loss = distributed_train_epoch(train_dist_dataset)
template = ("Epoch {}, Loss: {}, Accuracy: {}")
print (template.format(epoch+1, train_loss, train_accuracy.result()*100))
train_accuracy.reset_states()
注意:作为通用的规则,您应该使用tf.keras.Metrics
来跟踪每个样本的值以避免它们在副本中合并。
我们 不 建议使用tf.metrics.Mean
来跟踪不同副本的训练损失,因为在执行过程中会进行损失缩放计算。
例如,如果您运行具有以下特点的训练作业:
通过损失缩放,您可以通过添加损失值来计算每个副本上的每个样本的损失值,然后除以全局批量大小。 在这种情况下:(2 + 3)/ 4 = 1.25
和(4 + 5)/ 4 = 2.25
。
如果您使用 tf.metrics.Mean
来跟踪两个副本的损失,结果会有所不同。 在这个例子中,你最终得到一个total
为 3.50 和count
为 2 的结果,当调用result()
时,你将得到total
/count
= 1.75。 使用tf.keras.Metrics
计算损失时会通过一个等于同步副本数量的额外因子来缩放。