In [1]:
# reveal.js presentation configuration
from notebook.services.config import ConfigManager
cm = ConfigManager()
cm.update('livereveal', {
'theme': 'league',
'transition': 'fade',
'center': 'false',
'overview' : 'true',
'start_slideshow_at': 'selected'
})
# imports
import theano
from theano import tensor
import codecs
import numpy
import sys
from blocks import initialization
from blocks import roles
from blocks.model import Model
from blocks.bricks import Linear, NDimensionalSoftmax
from blocks.bricks.parallel import Fork
from blocks.bricks.recurrent import GatedRecurrent
from blocks.bricks.lookup import LookupTable
from blocks.filter import VariableFilter
from blocks.serialization import load_parameters
from blocks.bricks import NDimensionalSoftmax
Fabio A. González, Universidad Nacional de Colombia
In [2]:
# Load training file to get vocabulary
text_file = 'biblia.txt' # input file
with codecs.open(text_file, 'r', 'utf-8') as f:
data = f.read()
chars = list(set(data))
vocab_size = len(chars)
char_to_ix = {ch: i for i, ch in enumerate(chars)}
ix_to_char = {i: ch for i, ch in enumerate(chars)}
print "Total number of chars:", len(data)
print "Vocabulary size:", vocab_size
In [3]:
print data[21000:22000]
In [4]:
# Define the model structure
embedding_size = 256 # number of hidden units per layer
# Input
lookup = LookupTable(length=vocab_size, dim=embedding_size)
# Layer 1
fork1 = Fork(output_names=['linear1', 'gates1'],
input_dim=embedding_size, output_dims=[embedding_size, embedding_size * 2])
fork1.name = 'fork1'
grnn1 = GatedRecurrent(dim=embedding_size)
grnn1.name = 'grnn1'
# Layer 2
fork2 = Fork(output_names=['linear2', 'gates2'],
input_dim=embedding_size, output_dims=[embedding_size, embedding_size * 2])
fork2.name = 'fork2'
grnn2 = GatedRecurrent(dim=embedding_size)
grnn2.name = 'grnn2'
# Softmax layer
hidden_to_output = Linear(name='hidden_to_output', input_dim=embedding_size,
output_dim=vocab_size)
softmax = NDimensionalSoftmax()
In [5]:
# Propagate x until top brick to get y_hat predictions
x = tensor.imatrix('features') # input
y = tensor.imatrix('targets') # output
embedding = lookup.apply(x)
linear1, gates1 = fork1.apply(embedding)
h1 = grnn1.apply(linear1, gates1)
h1.name = 'h1'
linear2, gates2 = fork2.apply(h1)
h2 = grnn2.apply(linear2, gates2)
h2.name = 'h2'
linear3 = hidden_to_output.apply(h2)
linear3.name = 'linear3'
y_hat = softmax.apply(linear3, extra_ndim=1)
y_hat.name = 'y_hat'
# COST
cost = softmax.categorical_cross_entropy(y, linear3, extra_ndim=1).mean()
cost.name = 'cost'
model = Model(cost)
In [87]:
# Load model parameters from a file
with open('grnn_best.tar') as model_file:
model_params = model.get_parameter_dict().keys()
param_vals = {k:v for k,v in load_parameters(model_file).iteritems() if k in model_params}
model.set_parameter_values(param_vals)
In [8]:
# Define Theano graph
y, x = model.inputs
softmax = NDimensionalSoftmax()
linear_output = [v for v in model.variables if v.name == 'linear3'][0]
y_hat = softmax.apply(linear_output, extra_ndim=1)
predict = theano.function([x], y_hat)
#theano.printing.pydotprint(predict, outfile="theano_graph.svg", format = 'svg', var_with_name_simple=True)
In [ ]:
#take activations of last element
activations = [h1[-1].flatten(), h2[-1].flatten()]
initial_states = [grnn1.parameters[-1], grnn2.parameters[-1]]
states_as_params = [tensor.vector(dtype=initial.dtype) for initial in initial_states]
#Get prob. distribution of the last element in the last seq of the batch
fprop = theano.function([x] + states_as_params, activations + [y_hat[-1, -1, :]], givens=zip(initial_states, states_as_params))
In [12]:
def sample(x_curr, states_values, fprop, temperature=1.0):
'''
Propagate x_curr sequence and sample next element according to
temperature sampling.
Return: sampled element and a list of the hidden activations produced by fprop.
'''
activations = fprop(x_curr, *states_values)
probs = activations.pop().astype('float64')
probs = probs / probs.sum()
if numpy.random.binomial(1, temperature) == 1:
sample = numpy.random.multinomial(1, probs).nonzero()[0][0]
else:
sample = probs.argmax()
return sample, activations, probs[sample]
def init_params(primetext=u''):
if not primetext or len(primetext) == 0:
primetext = ix_to_char[numpy.random.randint(vocab_size)]
primetext = ''.join([ch for ch in primetext if ch in char_to_ix.keys()])
if len(primetext) == 0:
raise Exception('primetext characters are not in the vocabulary')
x_curr = numpy.expand_dims(
numpy.array([char_to_ix[ch] for ch in primetext], dtype='uint8'), axis=1)
states_values = [initial.get_value() for initial in initial_states]
return x_curr, states_values
def stochastic_sampling(length, primetext=u'', temperature=1.0):
x_curr, states_values = init_params(primetext)
sys.stdout.write('Starting sampling\n' + primetext)
for _ in range(length):
idx, states_values, probs = sample(x_curr, states_values, fprop, temperature)
sys.stdout.write(ix_to_char[idx])
x_curr = [[idx]]
sys.stdout.write('\n')
def beam_sampling(length, primetext=u'', beam_size=5, temperature=1.0):
x_curr, states_values = init_params(primetext)
inputs = [x_curr] * beam_size
states = [states_values] * beam_size
logprobs = numpy.zeros((beam_size, 1))
seqs = numpy.zeros((length+x_curr.shape[0], beam_size))
seqs[0:x_curr.shape[0], :] = numpy.repeat(x_curr, beam_size, axis=1)
for k in range(length):
probs = numpy.zeros((beam_size,beam_size))
indices = numpy.zeros((beam_size,beam_size), dtype='int32')
hstates = numpy.empty((beam_size,beam_size), dtype=list)
for i in range(beam_size):
for j in range(beam_size):
indices[i][j], hstates[i][j], probs[i][j] = sample(inputs[i], states[i], fprop, temperature)
probs = numpy.log(probs) + logprobs
best_idx = probs.argmax(axis=1)
inputs = [[[idx]] for idx in indices[range(beam_size), best_idx]]
states = [hs for hs in hstates[range(beam_size), best_idx]]
logprobs = probs[range(beam_size), best_idx].reshape((beam_size, 1))
seqs[k +x_curr.shape[0], :] = numpy.array(inputs).flatten()
return logprobs.flatten(), numpy.array(seqs).squeeze()
In [93]:
logprobs, seqs = beam_sampling(100, primetext=u'blanco ', beam_size = 7, temperature = 1.0)
for i in logprobs.flatten().argsort()[::-1]:
print 'log P(s) = {0:3.3f}. Sample: '.format(logprobs.flatten()[i]) + u''.join([ix_to_char[ix] for ix in numpy.array(seqs).squeeze()[:,i]])
print '~' * 50
for i in [1..n]:
P = predict_next()
bin_var = sample_binomial(temperature)
if bin_var:
w_i = sample_multinomial(P)
else:
w_i = P.argmax()
In [77]:
stochastic_sampling(3000, primetext=u'El sentido de la vida es', temperature=0.3)
In [61]:
# Function to calculate the probability of a text
def log_likelihood(text):
text = ''.join([ch for ch in text if ch in char_to_ix])
x_curr = numpy.expand_dims(numpy.array([char_to_ix[ch] for ch in text], dtype='uint8'), axis=1)
probs = predict(x_curr).squeeze()
return sum([numpy.log(probs[i,c]) for i,c in enumerate(x_curr[1:].flatten())])
In [78]:
log_likelihood("buscad, y hallaréis")
Out[78]:
In [79]:
log_likelihood("this is a test")
Out[79]:
In [84]:
from itertools import permutations
bow = [' ', 'hombre', 'ama', 'a', 'el']
perms = [' '.join(perm) for perm in permutations(bow)]
for p, t in sorted([(-log_likelihood(text),text) for text in perms])[:20]:
print p, t
In [85]:
perms = [' '.join(perm) for perm in permutations(bow)]
for p, t in sorted([(-log_likelihood(text),text) for text in perms])[-20:]:
print p, t
In [66]:
from itertools import permutations
from random import shuffle
In [86]:
text = list(u'mnpu')
perms = [''.join(perm) for perm in permutations(text)]
for p, t in sorted([(-log_likelihood(text),text) for text in perms])[:5]:
print p, t
print "------------------"
for p, t in sorted([(-log_likelihood(text),text) for text in perms])[-5:]:
print p, t
In [24]:
print stochastic_sampling(400, u"(Lc. ", temperature = 0.1)