In [ ]:
#@title Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

케라스를 사용한 다중 워커(Multi-worker) 훈련

TensorFlow.org에서 보기 깃허브(GitHub) 소스 보기 Download notebook

Note: 이 문서는 텐서플로 커뮤니티에서 번역했습니다. 커뮤니티 번역 활동의 특성상 정확한 번역과 최신 내용을 반영하기 위해 노력함에도 불구하고 공식 영문 문서의 내용과 일치하지 않을 수 있습니다. 이 번역에 개선할 부분이 있다면 tensorflow/docs-l10n 깃헙 저장소로 풀 리퀘스트를 보내주시기 바랍니다. 문서 번역이나 리뷰에 참여하려면 docs-ko@tensorflow.org로 메일을 보내주시기 바랍니다.

개요

이 튜토리얼에서는 tf.distribute.Strategy API를 사용하여 케라스 모델을 다중 워커로 분산 훈련하는 방법을 살펴보겠습니다. 다중 워커를 사용하여 훈련할 수 있도록 전략을 디자인했기 때문에, 단일 워커 훈련용으로 만들어진 케라스 모델도 코드를 조금만 바꾸면 다중 워커를 사용하여 훈련할 수 있습니다.

tf.distribute.Strategy API에 관심이 있으신 분들은 텐서플로로 분산 훈련하기 가이드에서 텐서플로가 제공하는 분산 훈련 전략들을 훑어보실 수 있습니다.

설정

먼저, 텐서플로를 설정하고 필요한 패키지들을 가져옵니다.


In [ ]:
import tensorflow as tf
import numpy as np

데이터셋 준비하기

MNIST 데이터셋 준비해 보겠습니다. MNIST 데이터셋은 0-9 숫자를 손으로 쓴 28x28 픽셀 흑백 이미지입니다. 6만 개의 훈련 샘플과 1만 개의 테스트 샘플이 들어있습니다. 이 예에서는 훈련 데이터만 사용하겠습니다.


In [ ]:
def mnist_dataset(batch_size):
  (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
  # `x` 배열의 값은 uint8이고 [0, 255] 범위입니다.
  # 이를 [0, 1] 사이의 float32 값으로 바꾸어야 합니다.
  x_train = x_train / np.float32(255)
  y_train = y_train.astype(np.int64)
  train_dataset = tf.data.Dataset.from_tensor_slices(
      (x_train, y_train)).shuffle(60000).repeat().batch(batch_size)
  return train_dataset

케라스 모델 만들기

tf.keras.Sequential API를 사용하여 간단한 합성곱 신경망 케라스 모델을 만들고 컴파일하도록 하겠습니다. MNIST 데이터셋으로 훈련시킬 모델입니다.

Note: 케라스 모델을 만드는 절차는 텐서플로 케라스 가이드에서 더 상세하게 설명하고 있습니다.


In [ ]:
def build_and_compile_cnn_model():
  model = tf.keras.Sequential([
      tf.keras.Input(shape=(28, 28)),
      tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(128, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
  model.compile(
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
      metrics=['accuracy'])
  return model

먼저 단일 워커를 이용하여 적은 수의 에포크만큼만 훈련을 해보고 잘 동작하는지 확인해봅시다. 에포크가 넘어감에 따라 손실(loss)은 줄어들고 정확도는 1.0에 가까워져야 합니다.


In [ ]:
per_worker_batch_size = 64
single_worker_dataset = mnist_dataset(per_worker_batch_size)
single_worker_model = build_and_compile_cnn_model()
single_worker_model.fit(single_worker_dataset, epochs=3, steps_per_epoch=70)

다중 워커 구성

자 이제 다중 워커 훈련의 세계로 들어가 봅시다. 텐서플로에서 여러 장비를 사용할 때는 TF_CONFIG 환경 변수를 설정해야 합니다. 하나의 클러스터를 구성하는 각 장비에 클러스터 구성을 알려주고 각각 다른 역할을 부여하기 위해 TF_CONFIG를 사용합니다.

TF_CONFIGclustertask 두 개의 부분으로 구성됩니다. cluster에는 훈련 클러스터에 대한 정보를 지정합니다. worker 같은 여러 타입의 작업 이름을 키로 하는 파이썬 딕셔너리를 지정합니다. 다중 워커 훈련에서는 보통 일반적인 워커보다 조금 더 많은 일을 하는 특별한 워커가 하나 필요합니다. 이 워커는 체크포인트를 저장하거나, 서머리(summary)를 쓰는 일 등을 추가로 담당하게 됩니다. 보통 치프('chief') 워커라고 부르고, 관례적으로 index 번호가 0인 워커가 치프 워커가 됩니다(사실 tf.distribute.Strategy가 이렇게 구현되었습니다). 한편 task에는 현재 워커의 작업에 대한 정보를 지정합니다.

이 예에서는 작업(task) type"worker"로 지정하고, index0으로 지정하였습니다. 이 말은 이 장비가 첫 번째 워커이고, 따라서 치프 워커이며, 다른 워커보다 더 많은 일을 하게 된다는 뜻입니다. 물론 다른 장비들에도 TF_CONFIG 환경변수가 설정되어야 합니다. 다른 장비들에도 cluster에는 동일한 딕셔너리를 지정하겠지만, task에는 각 장비의 역할에 따라 다른 작업 type이나 index를 지정해야 합니다.

예시를 위하여, 이 튜토리얼에서는 두 개의 워커를 localhost에 띄우는 방법을 보여드리겠습니다. 실제로는 각 워커를 다른 장비에서 띄울텐데, 실제 IP 주소와 포트를 할당하고, 그에 맞게 TF_CONFIG를 지정해야 합니다.

주의: 아래 코드를 코랩에서 실행하지 마십시오. 텐서플로 런타임이 주어진 IP와 포트로 gRPC 서버를 띄우려고 할 텐데, 아마도 실패할 것입니다.

os.environ['TF_CONFIG'] = json.dumps({
    'cluster': {
        'worker': ["localhost:12345", "localhost:23456"]
    },
    'task': {'type': 'worker', 'index': 0}
})

이 예제에서는 학습률을 바꾸지 않고 그대로 사용한 것에 주의하십시오. 실제로는 전역(global) 배치 크기에 따라 학습률을 조정해야 할 수 있습니다.

적절한 전략 고르기

텐서플로의 분산 전략은 크게 각 훈련 단계가 워커들이 가진 복제본들끼리 동기화되는 동기 훈련 방식과, 동기화가 엄격하게 이루어지지 않는 비동기 훈련 방식이 있습니다.

이 튜토리얼에서 다루는 MultiWorkerMirroredStrategy는 동기 다중 워커 훈련에서 추천하는 전략입니다. 모델을 훈련하려면 tf.distribute.experimental.MultiWorkerMirroredStrategy 인스턴스를 하나 만드십시오. MultiWorkerMirroredStrategy는 모델의 층에 있는 모든 변수의 복사본을 각 워커의 장치마다 만듭니다. 그리고 수집 작업을 위한 텐서플로 연산인 CollectiveOps를 사용하여 그래디언트를 모으고, 각 변수의 값을 동기화합니다. tf.distribute.Strategy 가이드에 이 전략에 대한 더 자세한 설명이 있습니다.


In [ ]:
strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()

Note: MultiWorkerMirroredStrategy.__init__()가 호출될 때, TF_CONFIG를 파싱하고 텐서플로 gRPC 서버가 구동됩니다. 따라서 TF_CONFIG 환경변수는 tf.distribute.Strategy 인스턴스를 만들기 전에 설정해야 합니다.

MultiWorkerMirroredStrategyCollectiveCommunication 매개변수로 선택할 수 있는 여러 가지 구현체를 제공합니다. RING(링)은 링 구조 기반의 수집 작업 구현체이고, 장비 간 통신을 위하여 gRPC를 사용합니다. NCCLNvidia의 NCCL로 수집 작업을 구현한 것입니다. AUTO를 지정하면, 런타임이 알아서 선택합니다. 어떤 수집 작업 구현체가 최적인지는 GPU의 종류와 수, 클러스터 내 네트워크 연결 등 여러 요소에 따라 달라집니다.

MultiWorkerMirroredStrategy로 모델 훈련하기

다중 워커 분산 훈련을 위하여 tf.distribute.Strategy API를 tf.keras와 함께 사용하려면, 딱 한 가지만 바꾸면 됩니다. 바로 모델 구성과 model.compile() 호출 코드를 strategy.scope() 안으로 넣는 것입니다. 분산 전략의 범위(scope)를 써서 변수를 어디에 어떻게 만들지 지정할 수 있습니다. MultiWorkerMirroredStrategy의 경우, 만들어지는 변수는 MirroredVariable이고, 각 워커에 복제본이 생깁니다.

Note: 현재는 MultiWorkerMirroredStrategy에 제약 사항이 있습니다. 전략 객체를 만든 후 텐서플로 연산을 만들어야 합니다. 만약 RuntimeError: Collective ops must be configured at program startup 에러가 발생하면 프로그램 서두에서 MultiWorkerMirroredStrategy 객체를 만들고 이 전략이 초기화된 후 연산을 만드는 코드를 추가하세요.

Note: 코랩에서는 아래 코드가 기대한대로 동작하는 것처럼 보이겠지만, 사실은 단일 워커로 동작하는 것입니다. TF_CONFIG가 설정되어 있지 않기 때문입니다. 실전에서는 TF_CONFIG를 설정하면, 여러 장비를 활용하여 훈련 속도가 빨라지는 것을 볼 수 있습니다.

Note: MultiWorkerMirroredStrategy는 마지막에 남는 작은 배치를 처리하지 못하기 때문에 model.fit() 메서드에 항상 steps_per_epoch 매개변수를 전달해야 합니다. steps_per_epoch를 사용할 때 model.fit()은 에포크마다 입력에서 새로운 반복자(iterator)를 만들지 않고 마지막 에포크가 종료된 곳에서 계속 진행합니다. 따라서 데이터셋의 .repeat() 메서드를 호출하여 N 에포크 동안 적절한 수의 샘플을 준비해야 합니다. 반복 가능한 데이터셋이 아니라면 steps_per_epoch는 각 워커의 훈련 데이터 크기를 기반으로 지정되어야 합니다. 모든 워커는 올리듀스(allreduce)를 위해서 동일한 개수의 훈련과 평가의 스텝을 수행하기 때문입니다. 특히 균일하지 않은 샤드(shard)라면 steps_per_epoch는 워커당 배치 크기로 나눈 가장 작은 샤드 크기로 지정되어야 합니다.


In [ ]:
NUM_WORKERS = 4

# 여기서 배치 크기는 워커의 수를 곱한 크기로 늘려야 합니다. `tf.data.Dataset.batch`에는
# 전역 배치 크기를 지정해야 하기 때문입니다. 전에는 64였지만, 이제 128이 됩니다.
global_batch_size = per_worker_batch_size * num_workers
multi_worker_dataset = mnist_dataset(global_batch_size)

with strategy.scope():
  # `strategy.scope()` 안에서 모델을 만들고 컴파일합니다.
  multi_worker_model = build_and_compile_cnn_model()

# 케라스의 `model.fit()`은 지정된 에포크 횟수와 에포크 당 스텝 수로 모델을 훈련합니다.
# 여기에 나온 숫자는 예시를 보이기 위한 것일 뿐 제대로 된 모델을 만드는데 충분하지 않습니다.
multi_worker_model.fit(multi_worker_dataset, epochs=3, steps_per_epoch=70)

데이터셋 샤딩과 배치 크기

다중 워커 훈련에서는 수렴과 성능을 위하여 데이터를 여러 부분으로 샤딩(sharding)해야 합니다. 하지만, 위 코드 예에서는 데이터셋을 샤딩하지 않고 바로 model.fit()으로 보낸 것을 볼 수 있습니다. 이는 tf.distribute.Strategy API가 다중 워커 훈련에 맞게 자동으로 데이터셋을 샤딩해주기 때문입니다.

만약 훈련할 때 샤딩을 직접 하고 싶다면, tf.data.experimental.DistributeOptions API를 사용해서 자동 샤딩 기능을 끌 수 있습니다. 다음과 같이 말입니다.


In [ ]:
options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.OFF
train_datasets_no_auto_shard = train_datasets.with_options(options)

또 하나 주목할 점은 datasets의 배치 크기입니다. 앞서 코드에서 GLOBAL_BATCH_SIZE = 64 * NUM_WORKERS로 지정하였습니다. 단일 워커일 때보다 NUM_WORKERS 배만큼 크게 지정한 것입니다. 이는 실제로 각 워커에 전달되는 배치 크기가 tf.data.Dataset.batch()에 매개변수로 전달된 전역 배치 크기를 워커의 수로 나눈 것이 되기 때문입니다. 즉, 이렇게 바꾸어야 실제로 워커가 처리하는 배치 크기가 단일 워커일 때와 동일한 값이 됩니다.

평가

model.fit 메서드에 validation_data를 전달하면 에포크마다 훈련과 평가를 반복합니다. validation_data는 워커로 분산되어 평가되며 평가의 결과를 수집하여 모든 워커에 제공합니다. 훈련과 비슷하게 검증 데이터셋은 파일 수준에서 자동으로 샤딩됩니다. 검증 데이터셋에 전역 배치 크기와 validation_steps를 지정해야 합니다. 반복 데이터셋이 평가 단계에도 권장됩니다.

또한 주기적으로 체크포인트를 읽고 평가를 수행하는 작업을 만들 수도 있습니다. 바로 Estimator가 하는 일입니다. 하지만 평가에 권장하는 방식이 아니기 때문에 자세한 설명은 하지 않겠습니다.

예측

현재 model.predict 메서드는 MultiWorkerMirroredStrategy와 작동하지 않습니다.

성능

이제 케라스 모델이 완성되었습니다. MultiWorkerMirroredStrategy를 사용하여 여러 워커를 사용하여 훈련할 수 있습니다. 다중 워커 훈련의 성능을 더 높이려면 다음 기법들을 확인해 보십시오.

  • MultiWorkerMirroredStrategy는 여러 가지 수집 작업 통신 구현체를 제공합니다. RING(링)은 링 구조 기반의 수집 작업 구현체이고, 장비 간 통신을 위하여 gRPC를 사용합니다. NCCLNvidia의 NCCL로 수집 작업을 구현한 것입니다. AUTO를 지정하면, 런타임이 알아서 선택합니다. 어떤 수집 작업 구현체가 최적인지는 GPU의 종류와 수, 클러스터 내 네트워크 연결 등 여러 요소에 따라 달라집니다. 런타임이 알아서 선택한 것을 바꾸려면, MultiWorkerMirroredStrategy 생성자의 communication 매개변수에 적절한 값을 지정하십시오. 예를 들면 communication=tf.distribute.experimental.CollectiveCommunication.NCCL과 같이 지정합니다.
  • 가능하면 변수를 tf.float 타입으로 바꾸십시오. 공식 ResNet 모델을 보면 어떻게 바꾸는지 예제가 있습니다.

내결함성

동기 훈련 방식에서는, 워커 중 하나가 죽으면 전체 클러스터가 죽어버리고, 복구 메커니즘이 따로 없습니다. 하지만 케라스와 tf.distribute.Strategy를 함께 사용하면, 워커가 죽거나 불안정해지는 경우에도 내결함성을 제공합니다. 이는 사용자가 선택한 분산 파일 시스템에 훈련 상태를 저장하는 기능을 제공하기 때문입니다. 기존 인스턴스가 죽거나 정지당해서 재시작되는 경우에도 훈련 상태를 복구할 수 있습니다.

모든 워커가 훈련 에포크 혹은 스텝에 따라 동기화되므로, 다른 워커들은 죽거나 정지당한 워커가 복구될 때까지 기다려야 합니다.

ModelCheckpoint 콜백

다중 워커 훈련의 내결함 기능을 사용하려면, tf.keras.Model.fit()를 호출할 때 tf.keras.callbacks.ModelCheckpoint의 인스턴스를 제공해야 합니다. 이 콜백이 체크포인트와 훈련 상태를 ModelCheckpointfilepath 매개변수에 지정한 디렉터리에 저장합니다.


In [ ]:
# `filepath` 매개변수를 모든 워커가 접근할 수 있는 파일 시스템 경로로 바꾸십시오.
callbacks = [tf.keras.callbacks.ModelCheckpoint(filepath='/tmp/keras-ckpt')]
with strategy.scope():
  multi_worker_model = build_and_compile_cnn_model()
multi_worker_model.fit(multi_worker_dataset,
                       epochs=3,
                       steps_per_epoch=70,
                       callbacks=callbacks)

워커가 정지당하면, 정지당한 워커가 다시 살아날 때까지 전체 클러스터가 잠시 멈춥니다. 워커가 클러스터에 다시 들어오면, 다른 워커도 재시작됩니다. 모든 워커가 이전에 저장한 체크포인트 파일을 읽고, 예전 상태를 불러오면 클러스터가 다시 일관된 상태가 됩니다. 그리고서 훈련이 재개됩니다.

ModelCheckpointfilepath가 위치한 디렉터리를 살펴보면, 임시로 생성된 체크포인트 파일들을 확인할 수 있을 것입니다. 이 파일들은 실패한 작업을 복구하는데 필요한 것들로, 다중 워커 훈련 작업을 성공적으로 마치고 나면 tf.keras.Model.fit() 함수가 끝날 때 라이브러리가 알아서 삭제할 것입니다.

ModelCheckpoint 콜백 밖에서 저장/복원하기

model.save이나 tf.saved_model.save를 사용해 모델을 저장하려면 워커에 있는 임시 디렉토리와 치프에서 제공하는 모델 디렉토리에 저장해야 합니다. 동일한 위치에 쓰는 충돌을 방지하기 위해 워커에 있는 임시 디렉토리는 고유해야 합니다. 모든 디렉토리에 저장되는 모델은 동등하며 일반적으로 치프가 저장하는 모델만 복원과 서빙에 사용됩니다. 훈련이 끝나면 워커가 만든 임시 디렉토리를 지우는 작업을 하는 것이 좋습니다.

체크포인트를 복원하려면 tf.train.latest_checkpoint를 사용해 모델 디렉토리에서 가장 최근의 체크포인트를 찾아 여기에서 restore를 호출해야 합니다. 즉 워커에서 임시 디렉토리에 저장하지만 모델 디렉토리에서 복원하는 것은 치프의 체크포인트에서만 가능합니다.

ModelCheckpoint 콜백은 이런 저장과 복원 로직을 감싸고 있습니다. 이 때문에 훈련하는 동안 만들어진 임시 디렉토리를 알고 있을 것입니다.

치프와 워커에 저장하는 이유는 체크포인트를 수행할 때 변수를 모으기 때문입니다. 이 올리듀스 통신 프로토콜에 치프와 워커가 참여해야 합니다. 치프와 워커가 같은 모델 디렉토리에 저장하게 놔두면 경쟁 관계로 오류가 발생합니다.


In [ ]:
# 모델 저장하기
# Let `is_chief`는 클러스터 스펙과 현재 태스크 타입을 조사하여 
# 워커가 치프이면 True를 반환하고 그렇지 않으면 False를 반환하는 유틸리티 함수입니다.
def is_chief():
  return True

if is_chief():
  # 모델 디렉토리, 클라우드 버킷이 이상적입니다.
  path = '/tmp/model_dir'
else:
  # 모든 워커에 고유한 경로에 저장합니다.
  worker_id = 1 
  path = '/tmp/model_dir/worker_tmp_' + str(worker_id)

multi_worker_model.save(path)

# 치프에서
# 체크포인트 복원
checkpoint = tf.train.Checkpoint(model=multi_worker_model)
manager = tf.train.CheckpointManager(
    checkpoint, directory=path, max_to_keep=5)
status = checkpoint.restore(manager.latest_checkpoint)

# 워커에서
# 치프가 모델을 저장한 경로입니다.
model_dir_path = '/tmp/model_dir'
checkpoint = tf.train.Checkpoint(model=multi_worker_model)
manager = tf.train.CheckpointManager(
    checkpoint, directory=path, max_to_keep=5)
latest_checkpoint = tf.train.latest_checkpoint(model_dir_path)
status = checkpoint.restore(latest_checkpoint)

참조

  1. 텐서플로로 분산 훈련하기 가이드는 사용 가능한 분산 전략들을 개괄하고 있습니다.
  2. 많은 공식 모델은 다양한 분산 전략으로 실행하도록 설정할 수 있습니다.
  3. 성능 가이드 문서에서 텐서플로 모델의 성능을 최적화하기 위해 사용할 수 있는 다른 전략과 도구를 소개합니다.