3차시: 텐서플로우 2.x 활용 감성 분석 기초

AI 맛보기 3주차: 2020. 07. 21. 20:00 ~ 22:00 (120분)

  1. 도구 불러오기 및 버전 확인
  2. 학습 데이터 다운로드: 스탠포드 AI 연구실 - 영화 리뷰 DB
  3. 학습 데이터 살펴보기: 차원, 미리보기
  4. 학습 데이터 준비: 불러오기
  5. 학습 데이터 준비: 전처리 (매우 어려움!)
  6. 학습 데이터 준비: 전처리 결과 확인
  7. 학습 모델 정의 및 생성
  8. 학습
  9. 학습 결과 확인
  10. 예측: 가지고 놀기!

참고자료

1. 도구 불러오기 및 버전 확인


In [ ]:
# 도구 준비
import os
import shutil
import random
import re
import string

import tensorflow as tf # 텐서플로우
import matplotlib.pyplot as plt # 시각화 도구
%matplotlib inline
import matplotlib.font_manager as fm
import numpy as np

In [ ]:
print(f'Tensorflow 버전을 확인합니다: {tf.__version__}')

In [ ]:
# 시각화 한글 설정
fonts = fm.findSystemFonts()
nanum_path = None
for font in fonts:
    if font.endswith('NanumGothic.ttf'):
        nanum_path = font
        break
if nanum_path == None:
    print(f'나눔 폰트를 설치해야 합니다!')
    print(f'!apt install -qq -y fonts-nanum*')
else:
    print(f'나눔 폰트 경로: {nanum_path}')
    nanum_prop = fm.FontProperties(fname=nanum_path)

2. 학습 데이터 다운로드: (aclImdb) 영화 리뷰 DB


In [ ]:
# 데이터 다운로드
url = 'https://ai.stanford.edu/~amaas/data/sentiment/aclImdb_v1.tar.gz'
dataset = tf.keras.utils.get_file('aclImdb_v1.tar.gz', url,
                                  untar=True, cache_dir='.',
                                  cache_subdir='')
dataset_dir = os.path.join(os.path.dirname(dataset), 'aclImdb')

In [ ]:
# 데이터 확인
print(f'데이터 압축 디렉터리 내부: {os.listdir(dataset_dir)}')

In [ ]:
# 학습 데이터 구조 확인
train_dir = os.path.join(dataset_dir, 'train')
print(f'학습 데이터 디렉터리 내부: {os.listdir(train_dir)}')

3. 학습 데이터 살펴보기: 미리보기


In [ ]:
# 긍정 학습 데이터 파일 확인
pos_train_dir = os.path.join(train_dir, 'pos')
pos_train_files = os.listdir(pos_train_dir)
neg_train_dir = os.path.join(train_dir, 'neg')
neg_train_files = os.listdir(neg_train_dir)
print('긍정 파일: ', end='')
for _ in range(0, 10):
    print(random.choice(pos_train_files), end=' ')
print()
print('부정 파일: ', end='')
for _ in range(0, 10):
    print(random.choice(neg_train_files), end=' ')
print()

In [ ]:
# 학습 데이터 내용 확인
## https://translate.google.co.kr/
## https://papago.naver.com/
sample_file = os.path.join(train_dir, 'pos', '2678_8.txt')
with open(sample_file, 'r') as f:
    print(f.read())

4. 학습 데이터 준비: 불러오기


In [ ]:
# 데이터 정리
print('''학습을 위한 데이터는 이런 구조가 되어야 합니다.
train/
....pos/
........file1.txt
........file2.txt
....neg/
........file1.txt
........file2.txt''')
remove_dir = os.path.join(train_dir, 'unsup')
print(f'불필요한 데이터 파일을 정리합니다. {remove_dir}')
shutil.rmtree(remove_dir)

In [ ]:
import multiprocessing
from tensorflow.python.data.ops import dataset_ops
from tensorflow.python.ops import io_ops
from tensorflow.python.ops import string_ops
from tensorflow.python.util.tf_export import keras_export


def index_directory(directory,
                    labels,
                    formats,
                    class_names=None,
                    shuffle=True,
                    seed=None,
                    follow_links=False):
    inferred_class_names = []
    for subdir in sorted(os.listdir(directory)):
        if os.path.isdir(os.path.join(directory, subdir)):
            inferred_class_names.append(subdir)
    if not class_names:
        class_names = inferred_class_names
    else:
        if set(class_names) != set(inferred_class_names):
            raise ValueError(
                    'The `class_names` passed did not match the '
                    'names of the subdirectories of the target directory. '
                    'Expected: %s, but received: %s' %
                    (inferred_class_names, class_names))
    class_indices = dict(zip(class_names, range(len(class_names))))

    pool = multiprocessing.pool.ThreadPool()
    results = []
    filenames = []
    for dirpath in (os.path.join(directory, subdir) for subdir in class_names):
        results.append(
            pool.apply_async(index_subdirectory,
                             (dirpath, class_indices, follow_links, formats)))
    labels_list = []
    for res in results:
        partial_filenames, partial_labels = res.get()
        labels_list.append(partial_labels)
        filenames += partial_filenames
    if labels != 'inferred':
        if len(labels) != len(filenames):
            raise ValueError('Expected the lengths of `labels` to match the number '
                             'of files in the target directory. len(labels) is %s '
                             'while we found %s files in %s.' % (
                                     len(labels), len(filenames), directory))
    else:
        i = 0
        labels = np.zeros((len(filenames),), dtype='int32')
        for partial_labels in labels_list:
            labels[i:i + len(partial_labels)] = partial_labels
            i += len(partial_labels)

    print('Found %d files belonging to %d classes.' %
          (len(filenames), len(class_names)))
    pool.close()
    pool.join()
    file_paths = [os.path.join(directory, fname) for fname in filenames]

    if shuffle:
        if seed is None:
            seed = np.random.randint(1e6)
        rng = np.random.RandomState(seed)
        rng.shuffle(file_paths)
        rng = np.random.RandomState(seed)
        rng.shuffle(labels)
    return file_paths, labels, class_names


def iter_valid_files(directory, follow_links, formats):
    walk = os.walk(directory, followlinks=follow_links)
    for root, _, files in sorted(walk, key=lambda x: x[0]):
        for fname in sorted(files):
            if fname.lower().endswith(formats):
                yield root, fname
        

def index_subdirectory(directory, class_indices, follow_links, formats):
    dirname = os.path.basename(directory)
    valid_files = iter_valid_files(directory, follow_links, formats)
    labels = []
    filenames = []
    for root, fname in valid_files:
        labels.append(class_indices[dirname])
        absolute_path = os.path.join(root, fname)
        relative_path = os.path.join(
                dirname, os.path.relpath(absolute_path, directory))
        filenames.append(relative_path)
    return filenames, labels
        

def check_validation_split_arg(validation_split, subset, shuffle, seed):
    if validation_split and not 0 < validation_split < 1:
        raise ValueError(
                '`validation_split` must be between 0 and 1, received: %s' %
                (validation_split,))
    if (validation_split or subset) and not (validation_split and subset):
        raise ValueError(
                'If `subset` is set, `validation_split` must be set, and inversely.')
    if subset not in ('training', 'validation', None):
        raise ValueError('`subset` must be either "training" '
                         'or "validation", received: %s' % (subset,))
    if validation_split and shuffle and seed is None:
        raise ValueError(
                'If using `validation_split` and shuffling the data, you must provide '
                'a `seed` argument, to make sure that there is no overlap between the '
                'training and validation subset.')


def get_training_or_validation_split(samples, labels, validation_split, subset):
    if not validation_split:
        return samples, labels

    num_val_samples = int(validation_split * len(samples))
    if subset == 'training':
        print('Using %d files for training.' % (len(samples) - num_val_samples,))
        samples = samples[:-num_val_samples]
        labels = labels[:-num_val_samples]
    elif subset == 'validation':
        print('Using %d files for validation.' % (num_val_samples,))
        samples = samples[-num_val_samples:]
        labels = labels[-num_val_samples:]
    else:
        raise ValueError('`subset` must be either "training" '
                         'or "validation", received: %s' % (subset,))
    return samples, labels


def labels_to_dataset(labels, label_mode, num_classes):
    label_ds = dataset_ops.Dataset.from_tensor_slices(labels)
    if label_mode == 'binary':
        label_ds = label_ds.map(
                lambda x: array_ops.expand_dims(math_ops.cast(x, 'float32'), axis=-1))
    elif label_mode == 'categorical':
        label_ds = label_ds.map(lambda x: array_ops.one_hot(x, num_classes))
    return label_ds


def paths_and_labels_to_dataset(file_paths,
                                labels,
                                label_mode,
                                num_classes,
                                max_length):
    path_ds = dataset_ops.Dataset.from_tensor_slices(file_paths)
    string_ds = path_ds.map(
        lambda x: path_to_string_content(x, max_length))
    if label_mode:
        label_ds = labels_to_dataset(labels, label_mode, num_classes)
        string_ds = dataset_ops.Dataset.zip((string_ds, label_ds))
    return string_ds


def path_to_string_content(path, max_length):
    txt = io_ops.read_file(path)
    if max_length is not None:
        txt = string_ops.substr(txt, 0, max_length)
    return txt


def text_dataset_from_directory(directory,
                                labels='inferred',
                                label_mode='int',
                                class_names=None,
                                batch_size=32,
                                max_length=None,
                                shuffle=True,
                                seed=None,
                                validation_split=None,
                                subset=None,
                                follow_links=False):
    if labels != 'inferred':
        if not isinstance(labels, (list, tuple)):
            raise ValueError(
                    '`labels` argument should be a list/tuple of integer labels, of '
                    'the same size as the number of text files in the target '
                    'directory. If you wish to infer the labels from the subdirectory '
                    'names in the target directory, pass `labels="inferred"`. '
                    'If you wish to get a dataset that only contains text samples '
                    '(no labels), pass `labels=None`.')
        if class_names:
            raise ValueError('You can only pass `class_names` if the labels are '
                             'inferred from the subdirectory names in the target '
                             'directory (`labels="inferred"`).')
    if label_mode not in {'int', 'categorical', 'binary', None}:
        raise ValueError(
                '`label_mode` argument must be one of "int", "categorical", "binary", '
                'or None. Received: %s' % (label_mode,))
    if seed is None:
        seed = np.random.randint(1e6)
    check_validation_split_arg(
        validation_split, subset, shuffle, seed)
    
    file_paths, labels, class_names = index_directory(
            directory,
            labels,
            formats=('.txt',),
            class_names=class_names,
            shuffle=shuffle,
            seed=seed,
            follow_links=follow_links)

    if label_mode == 'binary' and len(class_names) != 2:
        raise ValueError(
                'When passing `label_mode="binary", there must exactly 2 classes. '
                'Found the following classes: %s' % (class_names,))

    file_paths, labels = get_training_or_validation_split(
        file_paths, labels, validation_split, subset)

    dataset = paths_and_labels_to_dataset(
        file_paths=file_paths,
        labels=labels,
        label_mode=label_mode,
        num_classes=len(class_names),
        max_length=max_length)
    if shuffle:
        dataset = dataset.shuffle(buffer_size=batch_size * 8, seed=seed)
    dataset = dataset.batch(batch_size)
    dataset.class_names = class_names
    return dataset

In [ ]:
# 데이터 불러오기
batch_size = 32
validation_split = 0.2
seed = 20200721

print('학습 데이터 세트를 불러옵니다.')
raw_train_ds = text_dataset_from_directory(
        train_dir, 
        batch_size=batch_size, 
        validation_split=validation_split, 
        subset='training', 
        seed=seed)

In [ ]:
# 불러온 데이터 확인
for text_batch, label_batch in raw_train_ds.take(1):
    for i in range(3):
        print("Review", text_batch.numpy()[i])
        print("Label", label_batch.numpy()[i])

In [ ]:
print('레이블 번호 - 이름 확인!')
print("Label 0 corresponds to", raw_train_ds.class_names[0])
print("Label 1 corresponds to", raw_train_ds.class_names[1])

In [ ]:
print('검증 데이터 세트를 불러옵니다.')
raw_val_ds = text_dataset_from_directory(
        train_dir, 
        batch_size=batch_size, 
        validation_split=validation_split, 
        subset='validation', 
        seed=seed)

5. 학습 데이터 준비: 전처리 (매우 어려움!)


In [ ]:
def custom_standardization(input_data):
    lowercase = tf.strings.lower(input_data)
    stripped_html = tf.strings.regex_replace(lowercase, '<br />', ' ')
    return tf.strings.regex_replace(stripped_html,
                                    '[%s]' % re.escape(string.punctuation),
                                    '')

In [ ]:
max_features = 10000
sequence_length = 250

vectorize_layer = tf.keras.layers.experimental.preprocessing.TextVectorization(
        standardize=custom_standardization,
        max_tokens=max_features,
        output_mode='int',
        output_sequence_length=sequence_length)

In [ ]:
print('테스트 데이터와 레이블을 분리합니다.')
train_text = raw_train_ds.map(lambda x, y: x)
vectorize_layer.adapt(train_text)

In [ ]:
print('학습 완료 후 사용할 테스트 세트를 불러옵니다.')
test_dir = os.path.join(dataset_dir, 'test')
raw_test_ds = text_dataset_from_directory(
        test_dir, 
        batch_size=batch_size)

In [ ]:
def vectorize_text(text, label):
    text = tf.expand_dims(text, -1)
    return vectorize_layer(text), label

6. 학습 데이터 준비: 전처리 결과 확인


In [ ]:
print('테스트 데이터를 확인합니다.')
text_batch, label_batch = next(iter(raw_train_ds))
first_review, first_label = text_batch[0], label_batch[0]
print("Review", first_review)
print("Label", raw_train_ds.class_names[first_label])
print("Vectorized review", vectorize_text(tf.expand_dims(first_review, -1), first_label))

In [ ]:
print('각 특징 벡터의 원소 값을 확인할 수 있습니다.')
voc_size = len(vectorize_layer.get_vocabulary())
print(f'Vocabulary size: {voc_size}')
base = 9900
cnt = 10
if voc_size - cnt < base:
    print(f'{voc_size-cnt} 보다는 작은 수를 입력해야 합니다.')
else:
    for i in range(base, base+cnt):
        print(f'{i:4d} ---> {vectorize_layer.get_vocabulary()[i].decode("utf-8")}')

In [ ]:
print('학습, 검증, 테스트 데이터를 모두 벡터화 합니다.')
train_ds = raw_train_ds.map(vectorize_text)
val_ds = raw_val_ds.map(vectorize_text)
test_ds = raw_test_ds.map(vectorize_text)

In [ ]:
AUTOTUNE = tf.data.experimental.AUTOTUNE

print('데이터 입력부를 최적화합니다.')
train_ds = train_ds.cache().prefetch(buffer_size=AUTOTUNE)
val_ds = val_ds.cache().prefetch(buffer_size=AUTOTUNE)
test_ds = test_ds.cache().prefetch(buffer_size=AUTOTUNE)

7. 학습 모델 정의 및 생성


In [ ]:
embedding_dim = 16

print('모델을 정의합니다.')
model = tf.keras.Sequential([
    tf.keras.layers.Embedding(max_features+1, embedding_dim),
    tf.keras.layers.Dropout(0.2),
    tf.keras.layers.GlobalAveragePooling1D(),
    tf.keras.layers.Dropout(0.2),
    tf.keras.layers.Dense(1)])

model.summary()

In [ ]:
print('모델을 준비합니다.')
model.compile(loss=tf.keras.losses.BinaryCrossentropy(from_logits=True), 
              optimizer='adam', 
              metrics=tf.metrics.BinaryAccuracy(threshold=0.0))

8. 학습


In [ ]:
epochs = 10
history = model.fit(
        train_ds,
        validation_data=val_ds,
        epochs=epochs)

9. 학습 결과 확인


In [ ]:
print('모델 성능을 검증합니다.')
loss, accuracy = model.evaluate(test_ds)

print("Loss: ", loss)
print("Accuracy: ", accuracy)

In [ ]:
history_dict = history.history
acc = history_dict['binary_accuracy']
val_acc = history_dict['val_binary_accuracy']
loss = history_dict['loss']
val_loss = history_dict['val_loss']

epochs = range(1, len(acc) + 1)

fig1 = plt.figure(figsize=(6, 10))
ax = fig1.add_subplot(2, 1, 1)
ax.plot(epochs, loss, 'bo', label='학습 손실')
ax.plot(epochs, val_loss, 'b', label='검증 손실')
ax.set_title('학습 및 검증 손실', fontproperties=nanum_prop, fontsize=12)
ax.set_xlabel('Epochs', fontproperties=nanum_prop, fontsize=10)
ax.set_ylabel('손실', fontproperties=nanum_prop, fontsize=10)
ax.legend(prop=nanum_prop)

ax = fig1.add_subplot(2, 1, 2)
ax.plot(epochs, acc, 'bo', label='학습 정확도')
ax.plot(epochs, val_acc, 'b', label='검증 정확도')
ax.set_title('학습 및 검증 정확도', fontproperties=nanum_prop, fontsize=12)
ax.set_xlabel('Epochs', fontproperties=nanum_prop, fontsize=10)
ax.set_ylabel('정확도', fontproperties=nanum_prop, fontsize=10)
ax.legend(prop=nanum_prop)

10. 예측: 가지고 놀기!


In [ ]:
print('모델 출력')
export_model = tf.keras.Sequential([
    vectorize_layer,
    model,
    tf.keras.layers.Activation('sigmoid')
])

export_model.compile(
    loss=tf.keras.losses.BinaryCrossentropy(from_logits=False), optimizer="adam", metrics=['accuracy']
)

# Test it with `raw_test_ds`, which yields raw strings
loss, accuracy = export_model.evaluate(raw_test_ds)
print(accuracy)

In [ ]:
user_str = input('테스트 입력: ').strip()
print(f'예측 결과: {export_model.predict(tf.expand_dims(user_str, -1))[0][0]}')