In [ ]:
#@title Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Note: 이 문서는 텐서플로 커뮤니티에서 번역했습니다. 커뮤니티 번역 활동의 특성상 정확한 번역과 최신 내용을 반영하기 위해 노력함에도 불구하고 공식 영문 문서의 내용과 일치하지 않을 수 있습니다. 이 번역에 개선할 부분이 있다면 tensorflow/docs-l10n 깃헙 저장소로 풀 리퀘스트를 보내주시기 바랍니다. 문서 번역이나 리뷰에 참여하려면 docs-ko@tensorflow.org로 메일을 보내주시기 바랍니다.
이 예제는 사용자 정의 훈련 루프(custom training loops)와 함께 tf.distribute.Strategy
를 사용하는 법을 보여드립니다. 우리는 간단한 CNN 모델을 패션 MNIST 데이터셋에 대해 훈련을 할 것입니다. 패션 MNIST 데이터셋은 60000개의 28 x 28 크기의 훈련 이미지들과 10000개의 28 x 28 크기의 테스트 이미지들을 포함하고 있습니다.
이 예제는 유연성을 높이고, 훈련을 더 잘 제어할 수 있도록 사용자 정의 훈련 루프를 사용합니다. 또한, 사용자 훈련 루프를 사용하는 것은 모델과 훈련 루프를 디버깅하기 쉬워집니다.
In [ ]:
# Import TensorFlow
import tensorflow as tf
# 헬퍼 라이브러리들
import numpy as np
import os
print(tf.__version__)
In [ ]:
fashion_mnist = tf.keras.datasets.fashion_mnist
(train_images, train_labels), (test_images, test_labels) = fashion_mnist.load_data()
# 하나의 차원을 배열에 추가 -> 새로운 shape == (28, 28, 1)
# 이렇게 하는 이유는 우리의 모델에서 첫 번째 층이 합성곱 층이고
# 합성곱 층은 4D 입력을 요구하기 때문입니다.
# (batch_size, height, width, channels).
# batch_size 차원은 나중에 추가할 것입니다.
train_images = train_images[..., None]
test_images = test_images[..., None]
# 이미지를 [0, 1] 범위로 변경하기.
train_images = train_images / np.float32(255)
test_images = test_images / np.float32(255)
tf.distribute.MirroredStrategy
전략이 어떻게 동작할까요?
노트: 하나의 범위를 지정해서 모든 코드를 집어넣을 수 있습니다. 자, 같이 살펴보시죠!
In [ ]:
# 만약 장치들의 목록이 `tf.distribute.MirroredStrategy` 생성자 안에 명시되어 있지 않다면,
# 자동으로 장치를 인식할 것입니다.
strategy = tf.distribute.MirroredStrategy()
In [ ]:
print ('장치의 수: {}'.format(strategy.num_replicas_in_sync))
그래프와 변수를 플랫폼과 무관한 SavedModel 형식으로 내보냅니다. 모델을 내보냈다면, 모델을 불러올 때 범위(scope)를 지정해도 되고 하지 않아도 됩니다.
In [ ]:
BUFFER_SIZE = len(train_images)
BATCH_SIZE_PER_REPLICA = 64
GLOBAL_BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync
EPOCHS = 10
분산 데이터셋들을 strategy.scope
내에 생성합니다.
In [ ]:
with strategy.scope():
train_dataset = tf.data.Dataset.from_tensor_slices((train_images, train_labels)).shuffle(BUFFER_SIZE).batch(GLOBAL_BATCH_SIZE)
train_dist_dataset = strategy.experimental_distribute_dataset(train_dataset)
test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(GLOBAL_BATCH_SIZE)
test_dist_dataset = strategy.experimental_distribute_dataset(test_dataset)
In [ ]:
def create_model():
model = tf.keras.Sequential([
tf.keras.layers.Conv2D(32, 3, activation='relu'),
tf.keras.layers.MaxPooling2D(),
tf.keras.layers.Conv2D(64, 3, activation='relu'),
tf.keras.layers.MaxPooling2D(),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(64, activation='relu'),
tf.keras.layers.Dense(10, activation='softmax')
])
return model
In [ ]:
# 체크포인트들을 저장하기 위해서 체크포인트 디렉토리를 생성합니다.
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")
일반적으로, GPU/CPU 비율이 1인 단일 장치에서 손실은 입력 배치(batch)의 샘플 개수로 나누어집니다.
그렇다면, tf.distribute.Strategy
를 사용할 때, 손실은 어떻게 계산되어야 할까요?
예를들면, 4개의 GPU가 있고 입력 배치 크기가 64라고 하죠. 입력 배치 하나가 여러 개의 장치(4개의 GPU)에 분배됩니다. 각 장치는 크기가 16인 입력을 받습니다.
각 장치에 있는 모델은 해당 입력에 대해 정방향 계산(forward pass)을 수행하고 손실을 계산합니다. 손실을 장치에 할당된 입력 샘플의 수(BATCH_SIZE_PER_REPLICA = 16)로 나누는 것이 아니라 GLOBAL_BATCH_SIZE(64)로 나누어야 합니다.
왜 이렇게 할까요?
이 것을 텐서플로에서는 어떻게 할까요?
만약 이 예제처럼 사용자 정의 훈련 루프를 작성한다면, 다음과 같이 샘플당 손실을 더하고 GLOBAL_BATCH_SIZE로 나누어야 합니다.
scale_loss = tf.reduce_sum(loss) * (1. / GLOBAL_BATCH_SIZE)
또는 tf.nn.compute_average_loss
함수를 사용할 수도 있습니다. 이 함수는 샘플당 손실과 선택적으로 샘플 가중치, GLOBAL_BATCH_SIZE를 매개변수 값으로 받고 스케일이 조정된 손실을 반환합니다.
만약 규제 손실을 사용하는 모델이라면, 장치 개수로 손실 값을 스케일 조정해야 합니다. 이는 tf.nn_scale_regularization_loss
함수를 사용하여 처리할 수 있습니다.
tf.reduce_mean
을 사용하는 것은 추천하지 않습니다. 이렇게 하는 것은 손실을 실제 장치당 배치 크기로 나눕니다. 이 실제 장치당 배치 크기는 아마 각 단계(step)마다 크기가 다를 수 있습니다.
이런 축소(reduction
)와 스케일 조정은 케라스의 model.compile
과 model.fit
에서 자동적으로 수행됩니다.
만약 tf.keras.losses
클래스(아래의 예제에서처럼)를 사용한다면, reduction 매개변수를 명시적으로 NONE
또는 SUM
중 하나로 표시해야 합니다. AUTO
가 허용되지 않는 것은 사용자가 분산 모드에서 어떻게 축소할지 명시적으로 지정하는 것이 바람직하기 때문입니다.
현재 SUM_OVER_BATCH_SIZE
가 장치당 배치 크기로만 나누고 장치 개수로 나누는 것은 사용자에게 위임하기 때문입니다. 그래서 이렇게 하는 것 대신에 사용자가 명시적으로 축소를 수행하도록 만드는 것이 좋습니다.
In [ ]:
with strategy.scope():
# reduction을 `none`으로 설정합니다. 그래서 우리는 축소를 나중에 하고,
# GLOBAL_BATCH_SIZE로 나눌 수 있습니다.
loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
reduction=tf.keras.losses.Reduction.NONE)
# 또는 loss_fn = tf.keras.losses.sparse_categorical_crossentropy를 사용해도 됩니다.
def compute_loss(labels, predictions):
per_example_loss = loss_object(labels, predictions)
return tf.nn.compute_average_loss(per_example_loss, global_batch_size=GLOBAL_BATCH_SIZE)
In [ ]:
with strategy.scope():
test_loss = tf.keras.metrics.Mean(name='test_loss')
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
name='train_accuracy')
test_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
name='test_accuracy')
In [ ]:
# 모델과 옵티마이저는 `strategy.scope`에서 만들어져야 합니다.
with strategy.scope():
model = create_model()
optimizer = tf.keras.optimizers.Adam()
checkpoint = tf.train.Checkpoint(optimizer=optimizer, model=model)
In [ ]:
with strategy.scope():
def train_step(inputs):
images, labels = inputs
with tf.GradientTape() as tape:
predictions = model(images, training=True)
loss = compute_loss(labels, predictions)
gradients = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
train_accuracy.update_state(labels, predictions)
return loss
def test_step(inputs):
images, labels = inputs
predictions = model(images, training=False)
t_loss = loss_object(labels, predictions)
test_loss.update_state(t_loss)
test_accuracy.update_state(labels, predictions)
In [ ]:
with strategy.scope():
# `experimental_run_v2`는 주어진 계산을 복사하고,
# 분산된 입력으로 계산을 수행합니다.
@tf.function
def distributed_train_step(dataset_inputs):
per_replica_losses = strategy.experimental_run_v2(train_step,
args=(dataset_inputs,))
return strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses,
axis=None)
@tf.function
def distributed_test_step(dataset_inputs):
return strategy.experimental_run_v2(test_step, args=(dataset_inputs,))
for epoch in range(EPOCHS):
# 훈련 루프
total_loss = 0.0
num_batches = 0
for x in train_dist_dataset:
total_loss += distributed_train_step(x)
num_batches += 1
train_loss = total_loss / num_batches
# 테스트 루프
for x in test_dist_dataset:
distributed_test_step(x)
if epoch % 2 == 0:
checkpoint.save(checkpoint_prefix)
template = ("에포크 {}, 손실: {}, 정확도: {}, 테스트 손실: {}, "
"테스트 정확도: {}")
print (template.format(epoch+1, train_loss,
train_accuracy.result()*100, test_loss.result(),
test_accuracy.result()*100))
test_loss.reset_states()
train_accuracy.reset_states()
test_accuracy.reset_states()
위의 예제에서 주목해야 하는 부분
train_dist_dataset
과 test_dist_dataset
을 for x in ...
구조를 통해서 반복합니다.distributed_train_step
의 반환값입니다. tf.distribute.Strategy.reduce
호출을 사용해서 장치들 간의 스케일이 조정된 손실 값을 전부 합칩니다. 그리고 나서 tf.distribute.Strategy.reduce
반환 값을 더하는 식으로 배치 간의 손실을 모읍니다.tf.keras.Metrics
는 tf.distribute.Strategy.experimental_run_v2
에 의해서 실행되는 train_step
과 test_step
함수 안에서 업데이트 되어야 합니다.tf.distribute.Strategy.experimental_run_v2
는 그 전략안에 포함된 각 지역 복제 모델로부터 결과값을 반환해 줍니다. 그리고 이 결과를 사용하는 몇 가지 방법들이 있습니다. tf.distribute.Strategy.reduce
를 이용하여 값들을 합칠 수 있습니다. tf.distribute.Strategy.experimental_local_results
를 사용해서 결과값(지역 복제 모델 당 하나의 결과값)에 들어있는 값들의 리스트를 얻을 수도 있습니다.tf.distribute.Strategy
를 사용해서 체크포인트가 만들어진 모델은 전략 사용 여부에 상관없이 불러올 수 있습니다.
In [ ]:
eval_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
name='eval_accuracy')
new_model = create_model()
new_optimizer = tf.keras.optimizers.Adam()
test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(GLOBAL_BATCH_SIZE)
In [ ]:
@tf.function
def eval_step(images, labels):
predictions = new_model(images, training=False)
eval_accuracy(labels, predictions)
In [ ]:
checkpoint = tf.train.Checkpoint(optimizer=new_optimizer, model=new_model)
checkpoint.restore(tf.train.latest_checkpoint(checkpoint_dir))
for images, labels in test_dataset:
eval_step(images, labels)
print ('전략을 사용하지 않고, 저장된 모델을 복원한 후의 정확도: {}'.format(
eval_accuracy.result()*100))
In [ ]:
with strategy.scope():
for _ in range(EPOCHS):
total_loss = 0.0
num_batches = 0
train_iter = iter(train_dist_dataset)
for _ in range(10):
total_loss += distributed_train_step(next(train_iter))
num_batches += 1
average_train_loss = total_loss / num_batches
template = ("에포크 {}, 손실: {}, 정확도: {}")
print (template.format(epoch+1, average_train_loss, train_accuracy.result()*100))
train_accuracy.reset_states()
In [ ]:
with strategy.scope():
@tf.function
def distributed_train_epoch(dataset):
total_loss = 0.0
num_batches = 0
for x in dataset:
per_replica_losses = strategy.experimental_run_v2(train_step,
args=(x,))
total_loss += strategy.reduce(
tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)
num_batches += 1
return total_loss / tf.cast(num_batches, dtype=tf.float32)
for epoch in range(EPOCHS):
train_loss = distributed_train_epoch(train_dist_dataset)
template = ("Epoch {}, Loss: {}, Accuracy: {}")
print (template.format(epoch+1, train_loss, train_accuracy.result()*100))
train_accuracy.reset_states()
노트: 일반적인 규칙으로, tf.keras.Metrics
를 사용하여 샘플당 손실 값을 기록하고 장치 내부에서 값이 합쳐지는 것을 피해야 합니다.
tf.metrics.Mean
을 사용하여 여러 장치의 훈련 손실을 기록하는 것을 추천하지 않습니다. 왜냐하면 손실의 스케일을 조정하는 계산이 수행되기 때문입니다.
예를 들어, 다음과 같은 조건의 훈련을 수행한다고 합시다.
손실의 스케일 조정을 하면, 손실 값을 더하고 GLOBAL_BATCH_SIZE로 나누어 각 장치에 대한 샘플당 손실값을 계산할 수 있습니다. 이 경우에는 (2 + 3) / 4 = 1.24와 (4 + 5) / 4 = 2.25입니다.
만약 tf.metrics.Mean
을 사용해서 두 개의 장치에 대해 손실값을 계산한다면, 결과값이 다릅니다. 이 예제에서는, 측정 지표의 result()
가 메서드가 호출될 때 total
이 3.50이고 count
가 2입니다. 결과값은 total/count
가 1.75가 됩니다. tf.keras.Metrics
를 이용해서 계산한 손실값이 추가적인 요인에 의해서 크기조정되며, 이 추가적인 요인은 동기화되는 장치의 개수입니다.
사용자 정의 훈련루프를 포함한 분산 전략을 사용하는 몇 가지 예제가 있습니다.
MirroredStrategy
를 사용해서 MNIST를 훈련하는 TutorialMirroredStrategy
를 사용하는 DenseNet 예제.MirroredStrategy
와 TPUStrategy
를 사용해서 훈련하는 BERT 예제. 이 예제는 분산 훈련 중에 어떻게 체크포인트로부터 불러오는지와 어떻게 주기적으로 체크포인트들을 생성해 내는지를 이해하기에 정말 좋습니다.keras_use_ctl flag
를 사용해서 활성화 할 수 있는 MirroredStrategy를 이용해서 훈련되는 NCF 예제MirroredStrategy
을 사용해서 훈련되는 NMT 예제.더 많은 예제는 여기에 있습니다. Distribution strategy guide