In [156]:
import time
import os
import gzip
import pickle
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.autograd import Variable
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
In [83]:
# 引数で指定したモデルの各層に対して特定の初期化を行う
def initialize_weights(net):
for i, m in enumerate(net.modules()):
if isinstance(m, nn.Conv2d):
m.weight.data.normal_(0, 0.02)
m.bias.data.zero_()
elif isinstance(m, nn.ConvTranspose2d):
# パラメータのTensorをランダムに初期化
# バイアスは0に初期化
# _がついているのでinplaceで置き換え
m.weight.data.normal_(0, 0.02)
m.bias.data.zero_()
elif isinstance(m, nn.Linear):
m.weight.data.normal_(0, 0.02)
m.bias.data.zero_()
In [84]:
class generator(nn.Module):
def __init__(self, dataset='mnist'):
super(generator, self).__init__()
if dataset == 'mnist':
self.input_height = 28
self.input_width = 28
self.input_dim = 62 # zの乱数の次元
self.output_dim = 1
self.fc = nn.Sequential(
nn.Linear(self.input_dim, 1024),
nn.BatchNorm1d(1024),
nn.ReLU(),
# 入力の1/4のサイズに縮小
# deconvで4倍に拡大するため
nn.Linear(1024, 128 * (self.input_height // 4) * (self.input_width // 4)),
nn.BatchNorm1d(128 * (self.input_height // 4) * (self.input_width // 4)),
nn.ReLU(),
)
self.deconv = nn.Sequential(
nn.ConvTranspose2d(128, 64, kernel_size=4, stride=2, padding=1),
nn.BatchNorm2d(64),
nn.ReLU(),
nn.ConvTranspose2d(64, self.output_dim, 4, 2, 1),
nn.Sigmoid(),
)
# 層によってパラメータの初期化方法を変える
initialize_weights(self)
def forward(self, input):
x = self.fc(input)
x = x.view(-1, 128, (self.input_height // 4), (self.input_width // 4))
x = self.deconv(x)
return x
G = generator('mnist')
print(G)
In [96]:
class discriminator(nn.Module):
def __init__(self, dataset='mnist'):
super(discriminator, self).__init__()
if dataset == 'mnist':
self.input_height = 28
self.input_width = 28
self.input_dim = 1
self.output_dim = 1
self.conv = nn.Sequential(
nn.Conv2d(self.input_dim, 64, 4, 2, 1),
nn.LeakyReLU(0.2),
nn.Conv2d(64, 128, 4, 2, 1),
nn.BatchNorm2d(128),
nn.LeakyReLU(0.2),
)
self.fc = nn.Sequential(
nn.Linear(128 * (self.input_height // 4) * (self.input_width // 4), 1024),
nn.BatchNorm1d(1024),
nn.LeakyReLU(0.2),
nn.Linear(1024, self.output_dim),
nn.Sigmoid(),
)
initialize_weights(self)
def forward(self, input):
x = self.conv(input)
x = x.view(-1, 128 * (self.input_height // 4) * (self.input_width // 4))
x = self.fc(x)
return x
D = discriminator('mnist')
print(D)
In [115]:
class GAN(object):
def __init__(self):
self.epoch = 25
self.sample_num = 16
self.batch_size = 64
self.save_dir = 'models'
self.result_dir = 'results'
self.dataset = 'mnist'
self.log_dir = 'logs'
self.model_name = 'GAN'
self.gpu_mode = torch.cuda.is_available()
self.G = generator(self.dataset)
self.D = discriminator(self.dataset)
self.G_optimizer = optim.Adam(self.G.parameters(), lr=0.0002, betas=(0.5, 0.999))
self.D_optimizer = optim.Adam(self.D.parameters(), lr=0.0002, betas=(0.5, 0.999))
if self.gpu_mode:
self.G.cuda()
self.D.cuda()
# Dは2値分類なので Binary Cross Entropy Loss を使う
# lossはcuda()する必要あるのか?
self.BCE_loss = nn.BCELoss().cuda()
else:
self.BCE_loss = nn.BCELoss()
print(self.G)
print(self.D)
# load dataset
if self.dataset == 'mnist':
d = datasets.MNIST('data/mnist', train=True, download=True,
transform=transforms.Compose([transforms.ToTensor()]))
self.data_loader = DataLoader(d, batch_size=self.batch_size, shuffle=True)
self.z_dim = 62
if self.gpu_mode:
self.sample_z = Variable(torch.rand((self.batch_size, self.z_dim)).cuda(), volatile=True)
else:
self.sample_z = Variable(torch.rand((self.batch_size, self.z_dim)), volatile=True)
def train(self):
self.train_hist = {}
self.train_hist['D_loss'] = []
self.train_hist['G_loss'] = []
self.train_hist['per_epoch_time'] = []
self.train_hist['total_time'] = []
if self.gpu_mode:
self.y_real = Variable(torch.ones(self.batch_size, 1).cuda())
self.y_fake = Variable(torch.zeros(self.batch_size, 1).cuda())
else:
self.y_real = Variable(torch.ones(self.batch_size, 1))
self.y_fake = Variable(torch.zeros(self.batch_size, 1))
self.D.train() # trainモードに移行(DropoutやBNに影響)
print('training start!!')
start_time = time.time()
for epoch in range(self.epoch):
self.G.train()
epoch_start_time = time.time()
for iter, (x, _) in enumerate(self.data_loader):
if iter == len(self.data_loader.dataset) // self.batch_size:
break
z = torch.rand((self.batch_size, self.z_dim))
if self.gpu_mode:
x, z = Variable(x.cuda()), Variable(z.cuda())
else:
x, z = Variable(x), Variable(z)
# update D network
self.D_optimizer.zero_grad()
# xは本物のデータ
# 正解はy_realなのでそれとのlossをとる
D_real = self.D(x)
D_real_loss = self.BCE_loss(D_real, self.y_real)
# Gはgeneratorが生成した偽物画像のバッチ
# 正解はy_fakeなのでそれとのlossをとる
G = self.G(z) # (64, 1, 28, 28)
D_fake = self.D(G)
D_fake_loss = self.BCE_loss(D_fake, self.y_fake)
D_loss = D_real_loss + D_fake_loss
self.train_hist['D_loss'].append(D_loss.data[0])
D_loss.backward()
self.D_optimizer.step()
# update G network
self.G_optimizer.zero_grad()
G = self.G(z) # (64, 1, 28, 28)
D_fake = self.D(G)
# Generatorからは偽物画像をy_realと判定してほしい
G_loss = self.BCE_loss(D_fake, self.y_real)
self.train_hist['G_loss'].append(G_loss.data[0])
G_loss.backward()
self.G_optimizer.step()
# 100バッチごとに中間結果を出力
if ((iter + 1) % 100) == 0:
print('Epoch: [%2d] [%4d/%4d] D_loss: %.8f, G_loss: %.8f' %
(epoch + 1,
iter + 1, # 何番目のバッチか?
len(self.data_loader.dataset) // self.batch_size, # バッチ数
D_loss.data[0],
G_loss.data[0]))
self.train_hist['per_epoch_time'].append(time.time() - epoch_start_time)
# 生成画像を可視化
self.train_hist['total_time'].append(time.time() - start_time)
print('Avg one epoch time: %.2f, total %d epochs time: %.2f' %
(np.mean(self.train_hist['per_epoch_time']),
self.epoch,
self.train_hist['total_time'][0]))
print('Training finish! ... save training results')
gan = GAN()
gan.train()
In [90]:
def print_network(net):
num_params = 0
for param in net.parameters():
num_params += param.numel()
print(net)
print('Total number of parameters: %d' % num_params)
In [102]:
d = datasets.MNIST('data/mnist', train=True, download=True,
transform=transforms.Compose([transforms.ToTensor()]))
data_loader = DataLoader(d, batch_size=10000, shuffle=True)
for iter, (x, _) in enumerate(data_loader):
print(iter, x.size())
print(len(data_loader.dataset))
In [117]:
class generator(nn.Module):
def __init__(self, dataset='mnist'):
super(generator, self).__init__()
if dataset == 'mnist':
self.input_height = 28
self.input_width = 28
self.input_dim = 62 + 10 # zの乱数の次元 + コンテキスト
self.output_dim = 1
self.fc = nn.Sequential(
nn.Linear(self.input_dim, 1024),
nn.BatchNorm1d(1024),
nn.ReLU(),
# 入力の1/4のサイズに縮小
# deconvで4倍に拡大するため
nn.Linear(1024, 128 * (self.input_height // 4) * (self.input_width // 4)),
nn.BatchNorm1d(128 * (self.input_height // 4) * (self.input_width // 4)),
nn.ReLU(),
)
self.deconv = nn.Sequential(
nn.ConvTranspose2d(128, 64, kernel_size=4, stride=2, padding=1),
nn.BatchNorm2d(64),
nn.ReLU(),
nn.ConvTranspose2d(64, self.output_dim, 4, 2, 1),
nn.Sigmoid(),
)
# 層によってパラメータの初期化方法を変える
initialize_weights(self)
def forward(self, input, label):
print('before gen cat:', input.size(), label.size())
x = torch.cat([input, label], 1)
print('after gen cat:', x.size())
x = self.fc(x)
x = x.view(-1, 128, (self.input_height // 4), (self.input_width // 4))
x = self.deconv(x)
return x
G = generator('mnist')
print(G)
In [194]:
class discriminator(nn.Module):
def __init__(self, dataset='mnist'):
super(discriminator, self).__init__()
if dataset == 'mnist':
self.input_height = 28
self.input_width = 28
self.input_dim = 1 + 10 # 画像 + クラスラベル
self.output_dim = 1
self.conv = nn.Sequential(
nn.Conv2d(self.input_dim, 64, 4, 2, 1),
nn.LeakyReLU(0.2),
nn.Conv2d(64, 128, 4, 2, 1),
nn.BatchNorm2d(128),
nn.LeakyReLU(0.2),
)
self.fc = nn.Sequential(
nn.Linear(128 * (self.input_height // 4) * (self.input_width // 4), 1024),
nn.BatchNorm1d(1024),
nn.LeakyReLU(0.2),
nn.Linear(1024, self.output_dim),
nn.Sigmoid(),
)
initialize_weights(self)
def forward(self, input, label):
print('before disc cat:', input.size(), label.size())
x = torch.cat([input, label], 1)
print('after disc cat:', x.size())
x = self.conv(x)
x = x.view(-1, 128 * (self.input_height // 4) * (self.input_width // 4))
x = self.fc(x)
return x
D = discriminator('mnist')
print(D)
In [195]:
def load_mnist(dataset):
data_dir = os.path.join("./data", dataset)
def extract_data(filename, num_data, head_size, data_size):
with gzip.open(filename) as bytestream:
bytestream.read(head_size)
buf = bytestream.read(data_size * num_data)
data = np.frombuffer(buf, dtype=np.uint8).astype(np.float)
return data
data = extract_data(data_dir + '/train-images-idx3-ubyte.gz', 60000, 16, 28 * 28)
trX = data.reshape((60000, 28, 28, 1))
data = extract_data(data_dir + '/train-labels-idx1-ubyte.gz', 60000, 8, 1)
trY = data.reshape((60000))
data = extract_data(data_dir + '/t10k-images-idx3-ubyte.gz', 10000, 16, 28 * 28)
teX = data.reshape((10000, 28, 28, 1))
data = extract_data(data_dir + '/t10k-labels-idx1-ubyte.gz', 10000, 8, 1)
teY = data.reshape((10000))
trY = np.asarray(trY).astype(np.int)
teY = np.asarray(teY)
X = np.concatenate((trX, teX), axis=0)
y = np.concatenate((trY, teY), axis=0).astype(np.int)
seed = 547
np.random.seed(seed)
np.random.shuffle(X)
np.random.seed(seed)
np.random.shuffle(y)
y_vec = np.zeros((len(y), 10), dtype=np.float)
for i, label in enumerate(y):
y_vec[i, y[i]] = 1
X = X.transpose(0, 3, 1, 2) / 255.
# y_vec = y_vec.transpose(0, 3, 1, 2)
X = torch.from_numpy(X).type(torch.FloatTensor)
y_vec = torch.from_numpy(y_vec).type(torch.FloatTensor)
return X, y_vec
In [197]:
class CGAN(object):
def __init__(self):
self.epoch = 25
self.sample_num = 100
self.batch_size = 64
self.save_dir = 'models'
self.result_dir = 'results'
self.dataset = 'mnist'
self.log_dir = 'logs'
self.model_name = 'GAN'
self.gpu_mode = torch.cuda.is_available()
self.G = generator(self.dataset)
self.D = discriminator(self.dataset)
self.G_optimizer = optim.Adam(self.G.parameters(), lr=0.0002, betas=(0.5, 0.999))
self.D_optimizer = optim.Adam(self.D.parameters(), lr=0.0002, betas=(0.5, 0.999))
if self.gpu_mode:
self.G.cuda()
self.D.cuda()
# Dは2値分類なので Binary Cross Entropy Loss を使う
# lossはcuda()する必要あるのか?
self.BCE_loss = nn.BCELoss().cuda()
else:
self.BCE_loss = nn.BCELoss()
# print(self.G)
# print(self.D)
# load mnist
# data_X: [70000, 1, 28, 28]
# data_Y: [70000, 10] 1-of-K
self.data_X, self.data_Y = load_mnist('mnist')
self.z_dim = 62
self.y_dim = 10
def train(self):
self.y_real = Variable(torch.ones(self.batch_size, 1))
self.y_fake = Variable(torch.zeros(self.batch_size, 1))
self.fill = torch.zeros([10, 10, self.data_X.size()[2], self.data_X.size()[3]])
for i in range(10):
self.fill[i, i, :, :] = 1
self.D.train()
print('training start!!')
for epoch in range(self.epoch):
self.G.train()
for iter in range(len(self.data_X) // self.batch_size):
x = self.data_X[iter * self.batch_size: (iter + 1) * self.batch_size]
z = torch.rand((self.batch_size, self.z_dim))
y_vec = self.data_Y[iter * self.batch_size: (iter + 1) * self.batch_size]
y_fill = self.fill[torch.max(y_vec, 1)[1].squeeze()]
x, z, y_vec, y_fill = Variable(x), Variable(z), Variable(y_vec), Variable(y_fill)
# update D network
self.D_optimizer.zero_grad()
D_real = self.D(x, y_fill)
D_real_loss = self.BCE_loss(D_real, self.y_real)
G = self.G(z, y_vec)
D_fake = self.D(G, y_fill)
D_fake_loss = self.BCE_loss(D_fake, self.y_fake)
D_loss = D_real_loss + D_fake_loss
D_loss.backward()
self.D_optimizer.step()
# update G network
self.G_optimizer.zero_grad()
G = self.G(z, y_vec)
D_fake = self.D(G, y_fill)
G_loss = self.BCE_loss(D_fake, self.y_real)
G_loss.backward()
self.G_optimizer.step()
gan = CGAN()
gan.train()
In [149]:
In [ ]: