In [172]:
%matplotlib inline
import matplotlib as mpl
mpl.use('Agg')
import numpy as np
from keras.models import Sequential
from keras.layers import Dense, Activation, Reshape, Flatten, Dropout, MaxPooling2D
from keras.layers.normalization import BatchNormalization
from keras.layers.convolutional import UpSampling2D, Conv2D
from keras.layers.advanced_activations import LeakyReLU
import os
from keras.datasets import mnist
from keras.optimizers import Adam
from PIL import Image
import math
import matplotlib.pyplot as plt
np.random.seed(7)
In [138]:
def generator_model():
model = Sequential()
# Generatorへのランダム入力は100次元のノイズベクトル
model.add(Dense(input_dim=100, units=1024))
model.add(BatchNormalization())
model.add(Activation('relu'))
model.add(Dense(7 * 7 * 128))
model.add(BatchNormalization())
model.add(Activation('relu'))
model.add(Reshape((7, 7, 128), input_shape=(7 * 7 * 128, )))
model.add(UpSampling2D((2, 2)))
model.add(Conv2D(64, (5, 5), padding='same'))
model.add(BatchNormalization())
model.add(Activation('relu'))
model.add(UpSampling2D((2, 2)))
model.add(Conv2D(1, (5, 5), padding='same'))
model.add(Activation('tanh'))
return model
In [31]:
generator = generator_model()
generator.summary()
In [112]:
def discriminator_model():
model = Sequential()
model.add(Conv2D(64, (5, 5),
strides=(2, 2),
padding='same',
input_shape=(28, 28, 1)))
model.add(LeakyReLU(0.2))
model.add(Conv2D(128, (5, 5), strides=(2, 2)))
model.add(LeakyReLU(0.2))
model.add(Flatten())
model.add(Dense(256))
model.add(LeakyReLU(0.2))
model.add(Dropout(0.5))
model.add(Dense(1))
model.add(Activation('sigmoid'))
return model
In [75]:
discriminator = discriminator_model()
discriminator.summary()
In [102]:
def combine_images(generated_images):
"""
benerated_images: (batches, height, width, channels)
"""
total = generated_images.shape[0]
cols = int(math.sqrt(total))
rows = math.ceil(float(total) / cols)
width, height = generated_images.shape[1:3]
combined_image = np.zeros((height * rows, width * cols),
dtype=generated_images.dtype)
for index, image in enumerate(generated_images):
i = int(index / cols)
j = index % cols
combined_image[width * i:width * (i + 1), height * j:height * (j + 1)] = image[:, :, 0]
return combined_image
In [94]:
BATCH_SIZE = 36
NUM_EPOCH = 20
GENERATED_IMAGE_PATH = 'generated_images/'
MODEL_PATH = 'models/'
if not os.path.exists(GENERATED_IMAGE_PATH):
os.mkdir(GENERATED_IMAGE_PATH)
if not os.path.exists(MODEL_PATH):
os.mkdir(MODEL_PATH)
# MNISTをロード
(X_train, y_train), (_, _) = mnist.load_data()
# [-1.0, 1.0] に正規化
X_train = (X_train.astype(np.float32) - 127.5) / 127.5
# (batch, height, width, channels)
X_train = X_train.reshape(X_train.shape[0], X_train.shape[1], X_train.shape[2], 1)
# discriminatorのみのモデル構築
discriminator = discriminator_model()
d_opt = Adam(lr=1e-5, beta_1=0.1)
discriminator.compile(loss='binary_crossentropy', optimizer=d_opt)
discriminator.summary()
# generator + discriminatorのモデル構築
# discriminatorの重みは固定(固定されるのはdcganの中のdiscriminatorのみ)
# trainableを反映させるにはcompile()が必要
# summary()表示するとわかる
discriminator.trainable = False
generator = generator_model()
# generatorが生成した画像をdiscriminatorが予測
dcgan = Sequential([generator, discriminator])
g_opt = Adam(lr=2e-4, beta_1=0.5)
dcgan.compile(loss='binary_crossentropy', optimizer=g_opt)
dcgan.summary()
In [101]:
num_batches = int(X_train.shape[0] / BATCH_SIZE)
print('Number of batches:', num_batches)
d_loss_history = []
g_loss_history = []
for epoch in range(NUM_EPOCH):
for index in range(num_batches):
# Generatorへの入力となるノイズベクトルをバッチサイズ分作成
noise = np.array([np.random.uniform(-1, 1, 100) for _ in range(BATCH_SIZE)])
# 本物の画像(訓練データ)
image_batch = X_train[index * BATCH_SIZE:(index + 1) * BATCH_SIZE]
# 生成画像
generated_images = generator.predict(noise, verbose=0)
# discriminatorを更新
# 実画像と生成画像をマージした1つのバッチをわたす
# discriminatorにBNが入っているとこの方法ではうまくいかない?
# http://qiita.com/t-ae/items/236457c29ba85a7579d5
# discriminatorの出力は本物の画像である確率なので本物の画像のラベルは1、生成画像のラベルは0
X = np.concatenate((image_batch, generated_images))
y = [1] * BATCH_SIZE + [0] * BATCH_SIZE
d_loss = discriminator.train_on_batch(X, y)
d_loss_history.append(d_loss)
# generatorを更新
# generatorは生成画像が本物の画像(ラベル1)と認識されたいため理想となる正解ラベルは1になる
noise = np.array([np.random.uniform(-1, 1, 100) for _ in range(BATCH_SIZE)])
g_loss = dcgan.train_on_batch(noise, [1] * BATCH_SIZE)
g_loss_history.append(g_loss)
print('epoch: %d, batch: %d, g_loss: %f, d_loss: %f' % (epoch, index, g_loss, d_loss))
# 各エポックで生成画像を出力
image = combine_images(generated_images)
image = image * 127.5 + 127.5
Image.fromarray(image.astype(np.uint8)).save(GENERATED_IMAGE_PATH + 'epoch-%04d.png' % (epoch))
# モデルを保存
generator.save('%s/generator-%03d-%.2f.h5' % (MODEL_PATH, epoch, g_loss))
discriminator.save('%s/discriminator-%03d-%.2f.h5' % (MODEL_PATH, epoch, d_loss))
In [103]:
with open('g_loss_history.log', 'w') as fp:
for x in g_loss_history:
fp.write('%f\n' % x)
with open('d_loss_history.log', 'w') as fp:
for x in d_loss_history:
fp.write('%f\n' % x)
In [118]:
g_loss_history = []
d_loss_history = []
with open('g_loss_history.log', 'r') as fp:
for line in fp:
line = line.rstrip()
g_loss_history.append(float(line))
with open('d_loss_history.log', 'r') as fp:
for line in fp:
line = line.rstrip()
d_loss_history.append(float(line))
In [120]:
len(g_loss_history)
Out[120]:
In [123]:
plt.plot(g_loss_history, label='g_loss')
plt.plot(d_loss_history, label='d_loss')
plt.xlabel('# of batches')
plt.ylabel('loss')
plt.xlim((0, len(g_loss_history)))
Out[123]:
In [130]:
from IPython.display import Image
In [136]:
Image('generated_images/epoch-0000.png', width=320, height=320)
Out[136]:
In [137]:
Image('generated_images/epoch-0019.png', width=320, height=320)
Out[137]:
In [161]:
# モデル生成直後はどちらもtrainable
modelA = Sequential([Dense(10, input_dim=100, activation='sigmoid')])
modelB = Sequential([Dense(100, input_dim=10, activation='sigmoid')])
# modelBは学習可能状態でコンパイル
modelB.compile(optimizer='adam', loss='binary_crossentropy')
# これ以後のmodelBはtrainable = False
modelB.trainable = False
connected = Sequential([modelA, modelB])
connected.compile(optimizer='adam', loss='binary_crossentropy')
In [162]:
modelA.summary()
In [163]:
connected.summary()
In [164]:
# modelBのデフォルトの重みをコピーしておく
w0 = np.copy(modelB.layers[0].get_weights()[0])
In [165]:
X1 = np.random.random((32, 100))
X1.shape
Out[165]:
In [166]:
# connectedの方を学習してもmodelBの重みは更新されていない(フリーズ)
connected.fit(X1, X1)
w1 = np.copy(modelB.layers[0].get_weights()[0])
print(np.array_equal(w0, w1))
In [167]:
# modelBに対してfitした場合は重みが更新されている
X2 = np.random.random((32, 10))
modelB.fit(X2, X1)
w2 = np.copy(modelB.layers[0].get_weights()[0])
print(np.array_equal(w1, w2))
In [168]:
# さらにconnectedをfitしてもmodelBの重みは更新されない
connected.fit(X1, X1)
w3 = np.copy(modelB.layers[0].get_weights()[0])
print(np.array_equal(w2, w3))
In [171]:
# model1
def generator_model():
model = Sequential()
# Generatorへのランダム入力は100次元のノイズベクトル
model.add(Dense(input_dim=100, units=1024))
model.add(Activation('tanh'))
model.add(Dense(7 * 7 * 128))
model.add(Activation('tanh'))
model.add(Reshape((7, 7, 128), input_shape=(7 * 7 * 128, )))
model.add(UpSampling2D((2, 2)))
model.add(Conv2D(64, (5, 5), padding='same'))
model.add(Activation('tanh'))
model.add(UpSampling2D((2, 2)))
model.add(Conv2D(1, (5, 5), padding='same'))
model.add(Activation('tanh'))
return model
def discriminator_model():
model = Sequential()
model.add(Conv2D(64, (5, 5), padding='same', input_shape=(28, 28, 1)))
model.add(MaxPooling2D((2, 2)))
model.add(Activation('tanh'))
model.add(Conv2D(128, (5, 5)))
model.add(MaxPooling2D((2, 2)))
model.add(Activation('tanh'))
model.add(Flatten())
model.add(Dense(256))
model.add(Activation('tanh'))
model.add(Dropout(0.5))
model.add(Dense(1))
model.add(Activation('sigmoid'))
return model
In [ ]:
# model2
# model1
def generator_model():
model = Sequential()
# Generatorへのランダム入力は100次元のノイズベクトル
model.add(Dense(input_dim=100, units=1024))
model.add(Activation('relu'))
model.add(Dense(7 * 7 * 128))
model.add(Activation('relu'))
model.add(Reshape((7, 7, 128), input_shape=(7 * 7 * 128, )))
model.add(UpSampling2D((2, 2)))
model.add(Conv2D(64, (5, 5), padding='same'))
model.add(Activation('relu'))
model.add(UpSampling2D((2, 2)))
model.add(Conv2D(1, (5, 5), padding='same'))
model.add(Activation('tanh'))
return model
def discriminator_model():
model = Sequential()
model.add(Conv2D(64, (5, 5), padding='same', input_shape=(28, 28, 1)))
model.add(MaxPooling2D((2, 2)))
model.add(LeakyReLU(0.2))
model.add(Conv2D(128, (5, 5)))
model.add(MaxPooling2D((2, 2)))
model.add(LeakyReLU(0.2))
model.add(Flatten())
model.add(Dense(256))
model.add(LeakyReLU(0.2))
model.add(Dropout(0.5))
model.add(Dense(1))
model.add(Activation('sigmoid'))
return model