In [ ]:
#@title Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

Note: 我们的 TensorFlow 社区翻译了这些文档。因为社区翻译是尽力而为, 所以无法保证它们是最准确的,并且反映了最新的 官方英文文档。如果您有改进此翻译的建议, 请提交 pull request 到 tensorflow/docs GitHub 仓库。要志愿地撰写或者审核译文,请加入 docs-zh-cn@tensorflow.org Google Group

概述

本教程展示了在训练分布式多工作器(worker)时,如何使用 tf.distribute.Strategy。如果你的代码使用了 tf.estimator,而且你也对拓展单机以获取高性能有兴趣,那么这个教程就是为你准备的。

在开始之前,请先阅读 tf.distribute.Strategy 指南。同样相关的还有 使用多 GPU 训练教程,因为在这个教程里也使用了相同的模型。

创建

首先,设置好 TensorFlow 以及将会用到的输入模块。


In [ ]:
import tensorflow_datasets as tfds
import tensorflow as tf
tfds.disable_progress_bar()

import os, json

输入函数

本教程里我们使用的是 TensorFlow 数据集(TensorFlow Datasets)里的 MNIST 数据集。本教程里的代码和 使用多 GPU 训练教程 类似,但有一个主要区别:当我们使用 Estimator 进行多工作器训练时,需要根据工作器的数量对数据集进行拆分,以确保模型收敛。输入的数据根据工作器其自身的索引来拆分,因此每个工作器各自负责处理该数据集 1/num_workers 个不同部分。


In [ ]:
BUFFER_SIZE = 10000
BATCH_SIZE = 64

def input_fn(mode, input_context=None):
  datasets, info = tfds.load(name='mnist',
                                with_info=True,
                                as_supervised=True)
  mnist_dataset = (datasets['train'] if mode == tf.estimator.ModeKeys.TRAIN else
                   datasets['test'])

  def scale(image, label):
    image = tf.cast(image, tf.float32)
    image /= 255
    return image, label

  if input_context:
    mnist_dataset = mnist_dataset.shard(input_context.num_input_pipelines,
                                        input_context.input_pipeline_id)
  return mnist_dataset.map(scale).shuffle(BUFFER_SIZE).batch(BATCH_SIZE)

使模型收敛的另一种合理方式是在每个工作器上设置不同的随机种子,然后对数据集进行随机重排。

多工作器配置

本教程主要的不同(区别于使用多 GPU 训练教程)在于多工作器的创建。明确集群中每个工作器的配置的标准方式是设置环境变量 TF_CONFIG

TF_CONFIG 里包括了两个部分:clustertaskcluster 提供了关于整个集群的信息,也就是集群中的工作器和参数服务器(parameter server)。task 提供了关于当前任务的信息。在本例中,任务的类型(type)是 worker 且该任务的索引(index)是 0。

出于演示的目的,本教程展示了怎么将 TF_CONFIG 设置成两个本地的工作器。在实践中,你可以在外部的IP地址和端口上创建多个工作器,并为每个工作器正确地配置好 TF_CONFIG 变量,也就是更改任务的索引。

警告:不要在 Colab 里执行以下代码。TensorFlow 的运行程序会试图在指定的 IP 地址和端口创建 gRPC 服务器,这会导致创建失败。

os.environ['TF_CONFIG'] = json.dumps({
    'cluster': {
        'worker': ["localhost:12345", "localhost:23456"]
    },
    'task': {'type': 'worker', 'index': 0}
})

定义模型

定义训练中用到的层,优化器和损失函数。本教程使用 Keras layers 定义模型,同使用多 GPU 训练教程类似。


In [ ]:
LEARNING_RATE = 1e-4
def model_fn(features, labels, mode):
  model = tf.keras.Sequential([
      tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(28, 28, 1)),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(64, activation='relu'),
      tf.keras.layers.Dense(10, activation='softmax')
  ])
  logits = model(features, training=False)

  if mode == tf.estimator.ModeKeys.PREDICT:
    predictions = {'logits': logits}
    return tf.estimator.EstimatorSpec(labels=labels, predictions=predictions)

  optimizer = tf.compat.v1.train.GradientDescentOptimizer(
      learning_rate=LEARNING_RATE)
  loss = tf.keras.losses.SparseCategoricalCrossentropy(
      from_logits=True, reduction=tf.keras.losses.Reduction.NONE)(labels, logits)
  loss = tf.reduce_sum(loss) * (1. / BATCH_SIZE)
  if mode == tf.estimator.ModeKeys.EVAL:
    return tf.estimator.EstimatorSpec(mode, loss=loss)

  return tf.estimator.EstimatorSpec(
      mode=mode,
      loss=loss,
      train_op=optimizer.minimize(
          loss, tf.compat.v1.train.get_or_create_global_step()))

注意:尽管在本例中学习率是固定的,但是通常情况下可能有必要基于全局的批次大小对学习率进行调整。

MultiWorkerMirroredStrategy

为训练模型,需要使用 tf.distribute.experimental.MultiWorkerMirroredStrategy 实例。MultiWorkerMirroredStrategy 创建了每个设备中模型层里所有变量的拷贝,且是跨工作器的。其用到了 CollectiveOps,这是 TensorFlow 里的一种操作,用来整合梯度以及确保变量同步。该策略的更多细节可以在 tf.distribute.Strategy 指南中找到。


In [ ]:
strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()

训练和评估模型

接下来,在 RunConfig 中为 estimator 指明分布式策略,同时通过调用 tf.estimator.train_and_evaluate 训练和评估模型。本教程只通过指明 train_distribute 进行分布式训练。但是也同样也可以通过指明 eval_distribute 来进行分布式评估。


In [ ]:
config = tf.estimator.RunConfig(train_distribute=strategy)

classifier = tf.estimator.Estimator(
    model_fn=model_fn, model_dir='/tmp/multiworker', config=config)
tf.estimator.train_and_evaluate(
    classifier,
    train_spec=tf.estimator.TrainSpec(input_fn=input_fn),
    eval_spec=tf.estimator.EvalSpec(input_fn=input_fn)
)

优化训练后的模型性能

现在你已经有了由 tf.distribute.Strategy 的模型和能支持多工作器的 Estimator。你可以尝试使用下列技巧来优化多工作器训练的性能。

  • 增加单批次的大小: 此处的批次大小指的是每个 GPU 上的批次大小。通常来说,最大的批次大小应该适应 GPU 的内存大小。
  • 变量转换: 尽可能将变量转换成 tf.float。官方的 ResNet 模型包括了如何完成的样例
  • 使用集群通信: MultiWorkerMirroredStrategy 提供了好几种集群通信的实现.
    • RING 实现了基于环状的集群,使用了 gRPC 作为跨主机通讯层。
    • NCCL 使用了 英伟达的 NCCL 来实现集群。
    • AUTO 将选择延后至运行时。

集群实现的最优选择不仅基于 GPU 的数量和种类,也基于集群间的通信网络。想要覆盖自动的选项,需要指明 MultiWorkerMirroredStrategy 的构造器里的 communication 参数,例如让 communication=tf.distribute.experimental.CollectiveCommunication.NCCL

更多的代码示例

  1. 端到端的示例里使用了 Kubernetes 模板。在这个例子里我们一开始使用了 Keras 模型,并使用了 tf.keras.estimator.model_to_estimator API 将其转换成了 Estimator。
  2. 官方的 ResNet50 模型,我们可以使用 MirroredStrategyMultiWorkerMirroredStrategy 来训练它。