Vamos a crear una clase que defina una capa de convolución. Esta capa formará parte de una red LeNet como se puede ver en la figura:
Como se aprecia en la figura, una capa de convolución va a estar formada por:
Vamos primero a importar las librerías que necesitamos (entre ellas están las clases CapaOculta y LogisticRegression que hemos hecho en los anteriores Notebooks):
In [ ]:
import time
import scipy.io as io
import numpy
import theano
import theano.tensor as T
from theano.tensor.signal import downsample #necesaria para el max-pooling
from theano.tensor.nnet import conv # Función convolución obtenida de la librería nnet de Theano
from mlp import CapaOculta, LogisticRegression # Nuestras capas ya definidas de anteriores ejemplos
Theano dispone de un paquete para realizar el max-pooling, theano.tensor.signal.downsample.max_pool_2d.
Como entrada a la función, le debemos incluir:
Por ejemplo: (Ver la diferencia entre ambas salidas)
In [ ]:
from theano.tensor.signal import downsample
input = T.dtensor4('input')
maxpool_shape = (2, 2)
pool_out = downsample.max_pool_2d(input, maxpool_shape, ignore_border=True)
f = theano.function([input],pool_out)
invals = numpy.random.RandomState(1).rand(3, 2, 5, 5)
print invals.shape
print 'Con ignore_border puesto a True:'
print 'invals[0, 0, :, :] =\n', invals[0, 0, :, :]
print 'output[0, 0, :, :] =\n', f(invals)[0, 0, :, :]
pool_out = downsample.max_pool_2d(input, maxpool_shape, ignore_border=False)
f = theano.function([input],pool_out)
print 'Con ignore_border puesto False:'
print 'invals[1, 0, :, :] =\n ', invals[1, 0, :, :]
print 'output[1, 0, :, :] =\n ', f(invals)[1, 0, :, :]
En el ejemplo anterior se ha creado un Tensor de 4 dimensiones. Estas dimensiones podrían ser en un ejemplo práctico:
En este caso, Theano dispone de la función theano.tensor.signal.conv2d.
Esta función dispone de dos entradas:
In [ ]:
import theano
from theano import tensor as T
from theano.tensor.nnet import conv
import numpy
rng = numpy.random.RandomState(23455)
# instantiate 4D tensor for input
input = T.tensor4(name='input')
# initialize shared variable for weights.
w_shp = (2, 3, 9, 9)
w_bound = numpy.sqrt(3 * 9 * 9)
W = theano.shared( numpy.asarray(
rng.uniform(
low=-1.0 / w_bound,
high=1.0 / w_bound,
size=w_shp),
dtype=input.dtype), name ='W')
# initialize shared variable for bias (1D tensor) with random values
b_shp = (2,)
b = theano.shared(numpy.asarray(
rng.uniform(low=-.5, high=.5, size=b_shp),
dtype=input.dtype), name ='b')
# build symbolic expression that computes the convolution of input with filters in w
conv_out = conv.conv2d(input, W)
# build symbolic expression to add bias and apply activation function, i.e.
#produce neural net layer output
output = T.nnet.sigmoid(conv_out + b.dimshuffle('x', 0, 'x', 'x'))
# create theano function to compute filtered images
f = theano.function([input], output)
In [ ]:
import numpy
import pylab
from PIL import Image
# open random image of dimensions 221x221
img = Image.open(open('images/teacher.jpg'))
# dimensions are (height, width, channel)
img = numpy.asarray(img, dtype='float32') / 256.
# put image in 4D tensor of shape (1, 3, height, width)
img_ = img.transpose(2, 0, 1).reshape(1, 3, 221, 221)
filtered_img = f(img_)
# plot original image and first and second components of output
pylab.subplot(1, 3, 1); pylab.axis('off'); pylab.imshow(img)
pylab.gray();
# recall that the convOp output (filtered image) is actually a "minibatch",
# of size 1 here, so we take index 0 in the first dimension:
pylab.subplot(1, 3, 2); pylab.axis('off'); pylab.imshow(filtered_img[0, 0, :, :])
pylab.subplot(1, 3, 3); pylab.axis('off'); pylab.imshow(filtered_img[0, 1, :, :])
pylab.show()
In [ ]:
class LeNetConvPoolLayer(object):
"""Pool Layer of a convolutional network """
def __init__(self, rng, input, filter_shape, image_shape, poolsize=(2, 2)):
"""
Allocate a LeNetConvPoolLayer with shared variable internal parameters.
:type rng: numpy.random.RandomState
:param rng: a random number generator used to initialize weights
:type input: theano.tensor.dtensor4
:param input: symbolic image tensor, of shape image_shape
:type filter_shape: tuple or list of length 4
:param filter_shape: (number of filters, num input feature maps,
filter height, filter width)
:type image_shape: tuple or list of length 4
:param image_shape: (batch size, num input feature maps,
image height, image width)
:type poolsize: tuple or list of length 2
:param poolsize: the downsampling (pooling) factor (#rows, #cols)
"""
assert image_shape[1] == filter_shape[1]
self.input = input
# there are "num input feature maps * filter height * filter width"
# inputs to each hidden unit
fan_in = numpy.prod(filter_shape[1:])
# each unit in the lower layer receives a gradient from:
# "num output feature maps * filter height * filter width" /
# pooling size
fan_out = (filter_shape[0] * numpy.prod(filter_shape[2:]) /
numpy.prod(poolsize))
# initialize weights with random weights
W_bound = numpy.sqrt(6. / (fan_in + fan_out))
self.W = theano.shared(
numpy.asarray(
rng.uniform(low=-W_bound, high=W_bound, size=filter_shape),
dtype=theano.config.floatX
),
borrow=True
)
# the bias is a 1D tensor -- one bias per output feature map
b_values = numpy.zeros((filter_shape[0],), dtype=theano.config.floatX)
self.b = theano.shared(value=b_values, borrow=True)
# convolve input feature maps with filters
conv_out = conv.conv2d(
input=input,
filters=self.W,
filter_shape=filter_shape,
image_shape=image_shape
)
# downsample each feature map individually, using maxpooling
pooled_out = downsample.max_pool_2d(
input=conv_out,
ds=poolsize,
ignore_border=True
)
# add the bias term. Since the bias is a vector (1D array), we first
# reshape it to a tensor of shape (1, n_filters, 1, 1). Each bias will
# thus be broadcasted across mini-batches and feature map
# width & height
self.output = T.tanh(pooled_out + self.b.dimshuffle('x', 0, 'x', 'x'))
# store parameters of this layer
self.params = [self.W, self.b]
Básicamente, igual que lo hacíamos para el caso del MLP
In [ ]:
learning_rate = 0.1
n_epochs = 500
dataset = 'digits.mat'
nkerns = [10, 20]
batch_size = 5000
rng = numpy.random.RandomState(23455)
In [ ]:
# Cargamos los datos
print '... cargando datos'
data = io.loadmat(dataset, squeeze_me=True)
dataIn = data['X']
dataOut = data['y']
for i in range(len(dataOut)):
if (dataOut[i] == 10):
dataOut[i] = 0
train_set_x = theano.shared(numpy.asarray(dataIn, dtype=theano.config.floatX),
borrow=True)
train_set_y = T.cast(theano.shared(numpy.asarray(dataOut,
dtype=theano.config.floatX), borrow=True), 'int32')
n_train_batches = train_set_x.get_value(borrow=True).shape[0] / batch_size
In [ ]:
index = T.iscalar() # Índice al lote a procesar
x = T.matrix('x') # Las imágenes de entrada
y = T.ivector('y') # Las etiquetas correspondientes a los números [1..10], correspondiendo el 10
# con el "0"
In [ ]:
print '... building the model'
layer0_input = x.reshape((batch_size, 1, 20, 20))
layer0 = LeNetConvPoolLayer(
rng,
input=layer0_input,
image_shape=(batch_size, 1, 20, 20),
filter_shape=(nkerns[0], 1, 5, 5),
poolsize=(1, 1)
)
layer1 = LeNetConvPoolLayer(
rng,
input=layer0.output,
image_shape=(batch_size, nkerns[0], 16, 16),
filter_shape=(nkerns[1], nkerns[0], 3, 3),
poolsize=(1, 1)
)
layer2_input = layer1.output.flatten(2)
layer2 = CapaOculta(
rng,
input=layer2_input,
n_in=nkerns[1] * 14 * 14,
n_out=500,
activation=T.tanh
)
layer3 = LogisticRegression(input=layer2.output, n_in=500, n_out=10)
In [ ]:
cost = layer3.negative_log_likelihood(y)
params = layer3.params + layer2.params + layer1.params + layer0.params
grads = T.grad(cost, params)
updates = [
(param_i, param_i - learning_rate * grad_i)
for param_i, grad_i in zip(params, grads)
]
train_model = theano.function(
[index],
cost,
updates=updates,
givens={
x: train_set_x[index * batch_size: (index + 1) * batch_size],
y: train_set_y[index * batch_size: (index + 1) * batch_size]
}
)
In [ ]:
print '... training'
start_time = time.clock()
epoch = 0
done_looping = False
while (epoch < n_epochs) and (not done_looping):
epoch = epoch + 1
if (epoch % 100 == 0):
print "Epoca: ", repr(epoch)
for minibatch_index in xrange(n_train_batches):
iter = (epoch - 1) * n_train_batches + minibatch_index
if iter % 100 == 0:
print 'training @ iter = ', iter
cost_ij = train_model(minibatch_index)
end_time = time.clock()
print "Tiempo de ejecucion es de %.2fm" % ((end_time-start_time) / 60.)
In [ ]:
predict = theano.function(
inputs=[index],
outputs=layer3.y_pred,
givens={
x: train_set_x[index * batch_size: (index + 1) * batch_size]
}
)
test = [predict(i) for i
in xrange(n_train_batches)]
real = [dataOut for i
in xrange(n_train_batches)]
print test
print real
In [ ]:
comparacion= map(lambda x,y:x==y, test, real)
count=0
for i in range(comparacion[0].shape[0]):
if (comparacion[0][i] == True):
count += 1
print repr(100.*count/5000.) + "%"
In [ ]:
layer0.W.get_value().shape
In [ ]:
layer1.W.get_value().shape
In [ ]:
import pylab
img = numpy.asarray(layer1.W.get_value()[0,0,:,:])
pylab.imshow(img)
pylab.show()
In [ ]:
predict = theano.function(
inputs=[index],
outputs=layer0.output,
givens={
x: train_set_x[index * batch_size: (index + 1) * batch_size]
}
)
In [ ]:
img = numpy.asarray(predict(0)[0,0,:,:])
pylab.imshow(img)
pylab.show()
In [ ]:
dataIn.shape
In [ ]:
img=dataIn[1,:].reshape(20,20)
In [ ]:
pylab.imshow(img)
pylab.show()
In [ ]:
len(dataOut)
In [ ]:
for i in range(len(dataOut)):
if (dataOut[i] == 10):
dataOut[i] = 0
In [ ]:
dataOut[20]
In [ ]: