In [ ]:
import os
os.environ['THEANO_FLAGS'] = 'floatX=float32,device=cuda'
In [ ]:
from PIL import Image
import numpy as np
import tarfile
# 下載 dataset
url = "https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz"
import os
import urllib
from urllib.request import urlretrieve
def reporthook(a,b,c):
print("\rdownloading: %5.1f%%"%(a*b*100.0/c), end="")
tar_gz = "cifar-10-python.tar.gz"
if not os.path.isfile(tar_gz):
print('Downloading data from %s' % url)
urlretrieve(url, tar_gz, reporthook=reporthook)
import pickle
train_X=[]
train_y=[]
tar_gz = "cifar-10-python.tar.gz"
with tarfile.open(tar_gz) as tarf:
for i in range(1, 6):
dataset = "cifar-10-batches-py/data_batch_%d"%i
print("load",dataset)
with tarf.extractfile(dataset) as f:
result = pickle.load(f, encoding='latin1')
train_X.extend( result['data'].reshape(-1,3,32,32)/255*2-1)
train_y.extend(result['labels'])
train_X=np.float32(train_X)
train_y=np.int32(train_y)
dataset = "cifar-10-batches-py/test_batch"
print("load",dataset)
with tarf.extractfile(dataset) as f:
result = pickle.load(f, encoding='latin1')
test_X=np.float32(result['data'].reshape(-1,3,32,32)/255*2-1)
test_y=np.int32(result['labels'])
train_X = np.concatenate([train_X, test_X])
train_X = np.concatenate([train_X[:,:,:,::-1], train_X])
train_y = np.concatenate([train_y, test_y, train_y, test_y])
train_Y = np.eye(10).astype('float32')[train_y]
In [ ]:
from IPython.display import display
def showX(X, rows=1):
assert X.shape[0]%rows == 0
int_X = ( (X+1)/2*255).clip(0,255).astype('uint8')
int_X = np.moveaxis(int_X, 1, 3)
int_X_reshape = int_X.reshape(rows, -1, 32, 32,3).swapaxes(1,2).reshape(rows*32,-1, 3)
display(Image.fromarray(int_X_reshape))
# 訓練資料, X 的前 20 筆
showX(train_X[:20])
print(train_y[:20])
name_array = np.array("飛機、汽車、鳥、貓、鹿、狗、青蛙、馬、船、卡車".split('、'))
print(name_array[train_y[:20]])
In [ ]:
import numpy as np
import theano
import theano.tensor as T
import lasagne
from lasagne.layers import DenseLayer, DropoutLayer, ReshapeLayer, InputLayer, FlattenLayer, Upscale2DLayer, LocalResponseNormalization2DLayer
floatX = theano.config.floatX
from lasagne.layers import MaxPool2DLayer, Conv2DLayer, TransposedConv2DLayer
from lasagne.layers import batch_norm
In [ ]:
conv_init = lasagne.init.Normal(0.05, 0)
gamma_init = lasagne.init.Normal(0.02, 1)
In [ ]:
def DCGAN_DC(isize, nz, nc, ndf, n_discrete, n_extra_layers=0):
_ = InputLayer(shape=(None, nc, isize, isize))
_ = Conv2DLayer(_, num_filters=ndf, filter_size=4, stride=2, pad=1, b=None, W=conv_init, flip_filters=False,
name = 'initial.conv.{0}-{1}'.format(nc, ndf),
nonlinearity=lasagne.nonlinearities.LeakyRectify(0.2))
csize, cndf = isize // 2, ndf
while csize > 5:
in_feat = cndf
out_feat = cndf*2
_ = Conv2DLayer(_, num_filters=out_feat, filter_size=4, stride=2, pad=1, b=None, W=conv_init,
flip_filters=False,
name = 'pyramid.{0}-{1}.conv'.format(in_feat, out_feat),
nonlinearity=lasagne.nonlinearities.LeakyRectify(0.2))
if 0: # change this line to turn on batch_norm
_ = batch_norm(_, epsilon=1e-5)
csize, cndf = csize//2, cndf*2
D = Conv2DLayer(_, num_filters=1, filter_size=csize, stride=1, pad=0, b=None, W=conv_init,
flip_filters=False,
name = 'final.{0}-{1}.conv'.format(cndf, 1),
nonlinearity=None)
D = FlattenLayer(D)
C = Conv2DLayer(_, num_filters=cndf, filter_size=csize, stride=1, pad=0, b=None, W=conv_init,
flip_filters=False,
name = 'final.{0}-{1}.conv'.format(cndf, cndf),
nonlinearity=lasagne.nonlinearities.LeakyRectify(0.2))
C = FlattenLayer(C)
C = DenseLayer(C, 10, nonlinearity=lasagne.nonlinearities.softmax)
return D, C
In [ ]:
def DCGAN_G(isize, nz, nc, ngf, n_extra_layers=0):
cngf= ngf//2
tisize = isize
while tisize > 5:
cngf = cngf * 2
tisize = tisize // 2
_ = InputLayer(shape=(None, nz))
_ = ReshapeLayer(_, (-1, nz, 1,1))
_ = TransposedConv2DLayer(_, num_filters=cngf, filter_size=tisize, stride=1, crop=0, b=None,
W=conv_init,
name = 'initial.{0}-{1}.convt'.format(nz, cngf))
_ = batch_norm(_, epsilon=1e-5)
csize, cndf = tisize, cngf
while csize < isize//2:
in_feat = cngf
out_feat = cngf//2
_ = TransposedConv2DLayer(_, num_filters=out_feat, filter_size=4, stride=2, crop=1, b=None, W=conv_init,
name = 'pyramid.{0}-{1}.convt'.format(in_feat, out_feat))
_ = batch_norm(_, epsilon=1e-5)
csize, cngf = csize*2, cngf//2
_ = TransposedConv2DLayer(_, num_filters=nc, filter_size=4, stride=2, crop=1, b=None, W=conv_init,
name = 'final.{0}-{1}.convt'.format(cngf, nc), nonlinearity=lasagne.nonlinearities.tanh)
return _
In [ ]:
nc = 3
nz = 32
n_discrete = 10
ngf = 64
ndf = 64
n_extra_layers = 0
Diters = 5
imageSize = 32
batchSize = 64
lrD = 1e-4
lrG = 1e-4
clamp_lower, clamp_upper = -0.01, 0.01
In [ ]:
netD, netC = DCGAN_DC(imageSize, nz, nc, ndf, n_discrete, n_extra_layers)
for l in lasagne.layers.get_all_layers(netD):
print(l.name, l.output_shape)
print()
for l in lasagne.layers.get_all_layers(netC):
print(l.name, l.output_shape)
In [ ]:
netG = DCGAN_G(imageSize, nz, nc, ngf, n_extra_layers)
for l in lasagne.layers.get_all_layers(netG):
print(l.name, l.output_shape)
In [ ]:
input_var_D = lasagne.layers.get_all_layers(netD)[0].input_var
input_var_G = lasagne.layers.get_all_layers(netG)[0].input_var
input_var_Y = T.matrix()
ϵ = T.TensorType(dtype=floatX,broadcastable=(False, True, True, True))()
In [ ]:
no_bn_avg = dict( batch_norm_update_averages=False,
batch_norm_use_averages=False)
output_D = lasagne.layers.get_output(netD, **no_bn_avg)
output_G = lasagne.layers.get_output(netG, **no_bn_avg)
output_D_fake = lasagne.layers.get_output(netD, inputs=output_G, **no_bn_avg)
output_C_fake = lasagne.layers.get_output(netC, inputs=output_G, **no_bn_avg)
output_C = lasagne.layers.get_output(netC, inputs=input_var_D, **no_bn_avg)
In [ ]:
mixed_X = (ϵ * output_G) + (1-ϵ) * input_var_D
In [ ]:
output_D_mixed = lasagne.layers.get_output(netD, inputs=mixed_X, **no_bn_avg)
In [ ]:
grad_mixed = T.grad(T.sum(output_D_mixed), mixed_X)
norm_grad_mixed = T.sqrt(T.sum(T.square(grad_mixed),axis=[1,2,3]))
grad_penalty = T.mean(T.square(norm_grad_mixed -1))
In [ ]:
loss_C_fake = lasagne.objectives.categorical_crossentropy(output_C_fake,
input_var_G[:, :n_discrete]).mean()
loss_C_real = lasagne.objectives.categorical_crossentropy(output_C,
input_var_Y).mean()
loss_D_real = output_D.mean()
loss_D_fake = output_D_fake.mean()
loss_D = loss_D_fake - loss_D_real
loss_D_gp = loss_D + 10 * grad_penalty + 0.01*loss_C_real
loss_G = -loss_D_fake + loss_C_fake
params_netD = lasagne.layers.get_all_params(netD, trainable=True)
params_netC = lasagne.layers.get_all_params(netC, trainable=True)
params_netG = lasagne.layers.get_all_params(netG, trainable=True)
optimize_G = lasagne.updates.adam(loss_G, params_netG, learning_rate=lrG, beta1=0.)
optimize_D = lasagne.updates.adam(loss_D_gp, params_netD+params_netC,
learning_rate=lrD, beta1=0.)
train_G_fn = theano.function([input_var_G], [loss_G], updates=optimize_G)
train_D_fn = theano.function([input_var_D, input_var_G, input_var_Y, ϵ],
[loss_D_gp, loss_D, loss_D_real, loss_D_fake, loss_C_fake],
updates=optimize_D)
generator_fn = theano.function([input_var_G], output_G)
In [17]:
def noise_generator(batchSize, nz, num_discrete, random_y=None):
noise = np.zeros(shape=(batchSize, nz), dtype='float32')
noise[:, num_discrete:] = np.random.normal(size=(batchSize, nz-num_discrete))
if random_y is None:
random_y = np.random.randint(0, num_discrete, size=batchSize)
else:
assert len(random_y) == batchSize
noise[np.arange(batchSize), random_y] = 1
return noise
fixed_noise = noise_generator(n_discrete*10, nz, n_discrete, [i//10 for i in range(100)])
In [18]:
import time
t0 = time.time()
niter = 1000
gen_iterations = 0
for epoch in range(niter):
i = 0
# 每個 epoch 洗牌一下
#np.random.shuffle(train_X)
batches = train_X.shape[0]//batchSize
while i < batches:
if gen_iterations < 25 or gen_iterations %500 == 0:
_Diters = 100
else:
_Diters = Diters
j = 0
while j < _Diters and i < batches:
j+=1
#clamp_D_fn()
real_data = train_X[i*batchSize:(i+1)*batchSize]
real_Y = train_Y[i*batchSize:(i+1)*batchSize]
i+=1
noise = noise_generator(batchSize, nz, n_discrete)
random_epsilon = np.random.uniform(size=(batchSize, 1,1,1)).astype('float32')
errD_gp, errD, errD_real, errD_fake, errC_fake = train_D_fn(real_data, noise, real_Y, random_epsilon)
if gen_iterations%2500 == 0:
fake = generator_fn(fixed_noise)
showX(fake, 10)
noise = noise_generator(batchSize, nz, n_discrete)
errG = train_G_fn(noise)[0]
if gen_iterations%500==0:
print('[%d/%d][%d/%d][%d] Loss_D: %f Loss_G: %f \nLoss_D_real: %f Loss_D_fake %f Loss_C_fake %f'
% (epoch, niter, i, batches, gen_iterations,
errD, errG, errD_real, errD_fake, errC_fake), time.time()-t0)
gen_iterations+=1
In [19]:
fake = generator_fn(fixed_noise)
showX(fake, 10)