In [ ]:
#@title Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

Overview

This tutorial demonstrates multi-worker distributed training with Keras model using tf.distribute.Strategy API, specifically tf.distribute.experimental.MultiWorkerMirroredStrategy. With the help of this strategy, a Keras model that was designed to run on single-worker can seamlessly work on multiple workers with minimal code change.

Distributed Training in TensorFlow guide is available for an overview of the distribution strategies TensorFlow supports for those interested in a deeper understanding of tf.distribute.Strategy APIs.

Setup

First, setup TensorFlow and the necessary imports.


In [ ]:
!pip install tf-nightly
import tensorflow as tf
import numpy as np

Preparing dataset

Now, let's prepare the MNIST dataset. The MNIST dataset comprises 60,000 training examples and 10,000 test examples of the handwritten digits 0–9, formatted as 28x28-pixel monochrome images. In this example, we will take the training part of the datasets to demonstrate.


In [ ]:
def mnist_dataset(batch_size):
  (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
  # The `x` arrays are in uint8 and have values in the range [0, 255].
  # We need to convert them to float32 with values in the range [0, 1]
  x_train = x_train / np.float32(255)
  y_train = y_train.astype(np.int64)
  train_dataset = tf.data.Dataset.from_tensor_slices(
      (x_train, y_train)).shuffle(60000).repeat().batch(batch_size)
  return train_dataset

Build the Keras model

Here we use tf.keras.Sequential API to build and compile a simple convolutional neural networks Keras model to train with our MNIST dataset.

Note: For a more comprehensive walkthrough of building Keras model, please see TensorFlow Keras Guide.


In [ ]:
def build_and_compile_cnn_model():
  model = tf.keras.Sequential([
      tf.keras.Input(shape=(28, 28)),
      tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(128, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
  model.compile(
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
      metrics=['accuracy'])
  return model

Let's first try training the model for a small number of epochs and observe the results in single worker to make sure everything works correctly. You should expect to see the loss dropping and accuracy approaching 1.0 as epoch advances.


In [ ]:
per_worker_batch_size = 64
single_worker_dataset = mnist_dataset(per_worker_batch_size)
single_worker_model = build_and_compile_cnn_model()
single_worker_model.fit(single_worker_dataset, epochs=3, steps_per_epoch=70)

Multi-worker Configuration

Now let's enter the world of multi-worker training. In TensorFlow, TF_CONFIG environment variable is required for training on multiple machines, each of which possibly has a different role. TF_CONFIG is a JSON string used to specify the cluster configuration on each worker that is part of the cluster.

There are two components of TF_CONFIG: cluster and task. cluster provides information about the training cluster, which is a dict consisting of different types of jobs such as worker. In multi-worker training with MultiWorkerMirroredStrategy, there is usually one worker that takes on a little more responsibility like saving checkpoint and writing summary file for TensorBoard in addition to what a regular worker does. Such worker is referred to as the 'chief' worker, and it is customary that the worker with index 0 is appointed as the chief worker (in fact this is how tf.distribute.Strategy is implemented). task on the other hand provides information of the current task. The first component cluster is the same for all workers, and the second component task is different on each worker and specifies the type and index of that worker.

In this example, we set the task type to "worker" and the task index to 0. This means the machine that has such setting is the first worker, which will be appointed as the chief worker and do more work than other workers. Note that other machines will need to have TF_CONFIG environment variable set as well, and it should have the same cluster dict, but different task type or task index depending on what the roles of those machines are.

For illustration purposes, this tutorial shows how one may set a TF_CONFIG with 2 workers on localhost. In practice, users would create multiple workers on external IP addresses/ports, and set TF_CONFIG on each worker appropriately.

Warning: Do not execute the following code in Colab. TensorFlow's runtime will attempt to create a gRPC server at the specified IP address and port, which will likely fail.

os.environ['TF_CONFIG'] = json.dumps({
    'cluster': {
        'worker': ["localhost:12345", "localhost:23456"]
    },
    'task': {'type': 'worker', 'index': 0}
})

Note that while the learning rate is fixed in this example, in general it may be necessary to adjust the learning rate based on the global batch size.

Choose the right strategy

In TensorFlow, distributed training consists of synchronous training, where the steps of training are synced across the workers and replicas, and asynchronous training, where the training steps are not strictly synced.

MultiWorkerMirroredStrategy, which is the recommended strategy for synchronous multi-worker training, will be demonstrated in this guide. To train the model, use an instance of tf.distribute.experimental.MultiWorkerMirroredStrategy. MultiWorkerMirroredStrategy creates copies of all variables in the model's layers on each device across all workers. It uses CollectiveOps, a TensorFlow op for collective communication, to aggregate gradients and keep the variables in sync. The tf.distribute.Strategy guide has more details about this strategy.


In [ ]:
strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()

Note: TF_CONFIG is parsed and TensorFlow's GRPC servers are started at the time MultiWorkerMirroredStrategy() is called, so TF_CONFIG environment variable must be set before a tf.distribute.Strategy instance is created.

MultiWorkerMirroredStrategy provides multiple implementations via the CollectiveCommunication parameter. RING implements ring-based collectives using gRPC as the cross-host communication layer. NCCL uses Nvidia's NCCL to implement collectives. AUTO defers the choice to the runtime. The best choice of collective implementation depends upon the number and kind of GPUs, and the network interconnect in the cluster.

Train the model with MultiWorkerMirroredStrategy

With the integration of tf.distribute.Strategy API into tf.keras, the only change you will make to distribute the training to multi-worker is enclosing the model building and model.compile() call inside strategy.scope(). The distribution strategy's scope dictates how and where the variables are created, and in the case of MultiWorkerMirroredStrategy, the variables created are MirroredVariables, and they are replicated on each of the workers.

Note: Currently there is a limitation in MultiWorkerMirroredStrategy where TensorFlow ops need to be created after the instance of strategy is created. If you see RuntimeError: Collective ops must be configured at program startup, try creating the instance of MultiWorkerMirroredStrategy at the beginning of the program and put the code that may create ops after the strategy is instantiated.

Note: In this Colab, the following code can run with expected result, but however this is effectively single-worker training since TF_CONFIG is not set. Once you set TF_CONFIG in your own example, you should expect speed-up with training on multiple machines.

Note: Always pass in steps_per_epoch argument to model.fit() since MultiWorkerMirroredStrategy does not support last partial batch handling. When using steps_per_epoch, model.fit() does not create a new iterator from the input every epoch, but continues from wherever the last epoch ended. Hence, make sure to call .repeat() on the dataset so it has an adequate number of examples for N epochs. If your dataset is not a repeated dataset, the steps_per_epoch should be set based on the amount of training data on each worker so that all workers would perform the same number of steps of training or evaluation, which is required by allreduce. In particular, if the sharding is not balanced, steps_per_epoch should be set to the size of the smallest sharded devided by the per-worker batch size.


In [ ]:
num_workers = 4

# Here the batch size scales up by number of workers since 
# `tf.data.Dataset.batch` expects the global batch size. Previously we used 64, 
# and now this becomes 128.
global_batch_size = per_worker_batch_size * num_workers
multi_worker_dataset = mnist_dataset(global_batch_size)

with strategy.scope():
  # Model building/compiling need to be within `strategy.scope()`.
  multi_worker_model = build_and_compile_cnn_model()

# Keras' `model.fit()` trains the model with specified number of epochs and
# number of steps per epoch. Note that the numbers here are for demonstration
# purposes only and may not sufficiently produce a model with good quality.
multi_worker_model.fit(multi_worker_dataset, epochs=3, steps_per_epoch=70)

Dataset sharding and batch size

In multi-worker training with MultiWorkerMirroredStrategy, sharding the dataset is needed to ensure convergence and performance. However, note that in above code snippet, the datasets are directly passed to model.fit() without needing to shard; this is because tf.distribute.Strategy API takes care of the dataset sharding automatically. It shards the dataset at the file level which may create skewed shards. In extreme cases where there is only one file, only the first shard (i.e. worker) will get training or evaluation data and as a result all workers will get errors.

If you prefer manual sharding for your training, automatic sharding can be turned off via tf.data.experimental.DistributeOptions api. Concretely,


In [ ]:
options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.OFF
dataset_no_auto_shard = multi_worker_dataset.with_options(options)

Another thing to notice is the batch size for the datasets. In the code snippet above, we use global_batch_size = per_worker_batch_size * num_workers, which is num_workers times as large as the case it was for single worker, because the effective per worker batch size is the global batch size (the parameter passed in tf.data.Dataset.batch()) divided by the number of workers, and with this change we are keeping the per worker batch size same as before.

Evaluation

If you pass validation_data into model.fit, it will alternate between training and evaluation for each epoch. The evaluation taking validation_data is distributed across the same set of workers and the evaluation results are aggregated and available for all workers. Similar to training, the validation dataset is automatically sharded at the file level. You need to set a global batch size in the validation dataset and set validation_steps. A repeated dataset is also recommended for evaluation.

Alternatively, you can also create another task that periodically reads checkpoints and runs the evaluation. This is what Estimator does. But this is not a recommended way to perform evaluation and thus its details are omitted.

Prediction

Currently model.predict doesn't work with MultiWorkerMirroredStrategy.

Performance

You now have a Keras model that is all set up to run in multiple workers with MultiWorkerMirroredStrategy. You can try the following techniques to tweak performance of multi-worker training with MultiWorkerMirroredStrategy.

  • MultiWorkerMirroredStrategy provides multiple collective communication implementations. RING implements ring-based collectives using gRPC as the cross-host communication layer. NCCL uses Nvidia's NCCL to implement collectives. AUTO defers the choice to the runtime. The best choice of collective implementation depends upon the number and kind of GPUs, and the network interconnect in the cluster. To override the automatic choice, specify a valid value to the communication parameter of MultiWorkerMirroredStrategy's constructor, e.g. communication=tf.distribute.experimental.CollectiveCommunication.NCCL.
  • Cast the variables to tf.float if possible. The official ResNet model includes an example of how this can be done.

Fault tolerance

In synchronous training, the cluster would fail if one of the workers fails and no failure-recovery mechanism exists. Using Keras with tf.distribute.Strategy comes with the advantage of fault tolerance in cases where workers die or are otherwise unstable. We do this by preserving training state in the distributed file system of your choice, such that upon restart of the instance that previously failed or preempted, the training state is recovered.

Since all the workers are kept in sync in terms of training epochs and steps, other workers would need to wait for the failed or preempted worker to restart to continue.

Note: Previously, the ModelCheckpoint callback provided a mechanism to restore training state upon restart from job failure for multi-worker training. We are introducing a new BackupAndRestore callback, to also add the support to single worker training for a consistent experience, and removed fault tolerance functionality from existing ModelCheckpoint callback. From now on, applications that rely on this behavior should migrate to the new callback.

ModelCheckpoint callback

ModelCheckpoint callback no longer provides fault tolerance functionality, please use BackupAndRestore callback instead.

The ModelCheckpoint callback can still be used to save checkpoints. But with this, if training was interrupted or successfully finished, in order to continue training from the checkpoint, user is responsible to load the model manually. Optionally user can choose to save and restore model/weights outside ModelCheckpoint callback.

Save/Restore outside ModelCheckpoint callback

You can save/restore your model using model.save or tf.saved_model.save, and tf.keras.models.load_model. Or you can save/restore a checkpoint with tf.train.Checkpoint. And restore from the latest checkpoint in the model directory using tf.train.latest_checkpoint.

You will need to save the model/checkpoint to a temporary directory on the workers and to the provided directory on the chief. Each worker saves separately, so the temporary directories on the workers need to be unique to prevent errors resulting from multiple workers trying to write to the same location.

The model/checkpoint saved in all the directories are identical and typically only the one saved by the chief should be referenced for restoring or serving.

We recommend that you have some cleanup logic that deletes the temporary directories created by the workers once your training has completed.

For multi-worker training, the reason we need to save on the chief and workers is because we might be aggregating variables during checkpointing which requires the chief and workers to participate in the allreduce communication protocol. Letting chief and workers save to the same model directory will result in errors due to contention.


In [ ]:
# Saving a model
# Let `is_chief` be a utility function that inspects the cluster spec and 
# current task type and returns True if the worker is the chief and False 
# otherwise.
def is_chief():
  return True

if is_chief():
  # This is the model directory will be ideally be a cloud bucket.
  path = '/tmp/model_dir'
else:
  # Save to a path that is unique across workers.
  worker_id = 1
  path = '/tmp/model_dir/worker_tmp_' + str(worker_id)

# checkpoint current model
checkpoint = tf.train.Checkpoint(model=multi_worker_model)
manager = tf.train.CheckpointManager(
    checkpoint, directory=path, max_to_keep=5)
manager.save()

# Restoring a checkpoint
# On the Chief
checkpoint = tf.train.Checkpoint(model=multi_worker_model)
manager = tf.train.CheckpointManager(
    checkpoint, directory=path, max_to_keep=5)
status = checkpoint.restore(manager.latest_checkpoint)

# On the Workers
# This is the path that the chief saves the model to
model_dir_path = '/tmp/model_dir'
checkpoint = tf.train.Checkpoint(model=multi_worker_model)
latest_checkpoint = tf.train.latest_checkpoint(model_dir_path)
status = checkpoint.restore(latest_checkpoint)

BackupAndRestore callback

Note: tf.keras.callbacks.experimental.BackupAndRestore callback is only available in tf-nightly.

BackupAndRestore callback provides fault tolerance functionality, by backing up the model and current epoch number in a temporary checkpoint file under backup_dir argument to BackupAndRestore. This is done at the end of each epoch.

Once jobs get interrupted and restart, the callback restores the last checkpoint, and training continues from the beginning of the interrupted epoch. Any partial training already done in the unfinished epoch before interruption will be thrown away, so that it doesn't affect the final model state.

To use it, provide an instance of tf.keras.callbacks.experimental.BackupAndRestore at the tf.keras.Model.fit() call.

With MultiWorkerMirroredStrategy, if a worker gets interrupted, the whole cluster pauses until the interrupted worker is restarted. Other workers will also restart, and the interrupted worker rejoins the cluster. Then, every worker reads the checkpoint file that was previously saved and picks up its former state, thereby allowing the cluster to get back in sync. Then the training continues.

BackupAndRestore callback uses CheckpointManager to save and restore the training state, which generates a file called checkpoint that tracks existing checkpoints together with the latest one. For this reason, backup_dir should not be re-used to store other checkpoints in order to avoid name collision.

Currently, BackupAndRestore callback supports single worker with no strategy, MirroredStrategy, and multi-worker with MultiWorkerMirroredStrategy. Below are two examples for both multi-worker training and single worker training.


In [ ]:
# Multi-worker training with MultiWorkerMirroredStrategy.

callbacks = [tf.keras.callbacks.experimental.BackupAndRestore(backup_dir='/tmp/backup')]
with strategy.scope():
  multi_worker_model = build_and_compile_cnn_model()
multi_worker_model.fit(multi_worker_dataset,
                       epochs=3,
                       steps_per_epoch=70,
                       callbacks=callbacks)

If you inspect the directory of backup_dir you specified in BackupAndRestore, you may notice some temporarily generated checkpoint files. Those files are needed for recovering the previously lost instances, and they will be removed by the library at the end of tf.keras.Model.fit() upon successful exiting of your training.

Note: Currently BackupAndRestore only supports eager mode. In graph mode, consider using Save/Restore Model mentioned above, and by providing initial_epoch in model.fit().

See also

  1. Distributed Training in TensorFlow guide provides an overview of the available distribution strategies.
  2. Official models, many of which can be configured to run multiple distribution strategies.
  3. The Performance section in the guide provides information about other strategies and tools you can use to optimize the performance of your TensorFlow models.