This example trains a RNN to create plausible words from a corpus. But it includes lots of interesting "bells and whistles"
The data used for training is one of :
Adversarial networks : http://carpedm20.github.io/faces/
Doing this with RNNs may be pretty novel : https://www.quora.com/Can-generative-adversarial-networks-be-used-in-sequential-data-in-recurrent-neural-networks-How-effective-would-they-be
In [ ]:
import numpy as np
import theano
import lasagne
#from lasagne.utils import floatX
import pickle
import gzip
import random
import time
WORD_LENGTH_MAX = 16
In [ ]:
# Load an interesting corpus (vocabulary words with frequencies from 1-billion-word-corpus) :
with gzip.open('../data/RNN/ALL_1-vocab.txt.gz') as f:
lines = [ l.strip().lower().split() for l in f.readlines() ]
lines[0:10]
In [ ]:
# Here are our characters : '[a-z\- ]'
import re
invalid_chars = r'[^a-z\- ]'
lines_valid = [ l for l in lines if not re.search(invalid_chars, l[0]) ]
#lines_valid = lines_valid[0:50000]
lines_valid[0:10], len(lines_valid)
In [ ]:
# /usr/share/dict/linux.words
with open('/usr/share/dict/linux.words','rt') as f:
linux_words = [ l.strip() for l in f.readlines() ]
linux_wordset = set(linux_words)
#'united' in wordset
lines_filtered = [l for l in lines_valid
if len(l[0])>=3 # Require each word to have 3 or more characters
and l[0] in linux_wordset # Require each word to be found in regular dictionary
and len(l[0])<WORD_LENGTH_MAX # And limit length (to avoid crazy roll-out of RNN)
]
lines_filtered[0:10], len(lines_filtered)
In [ ]:
# Split apart the words and their frequencies (Assume these are in sorted order, at least initial few)
words = [ l[0] for l in lines_filtered ]
wordset = set(words)
wordsnp = np.array(words)
freqs_raw = np.array( [ int(l[1]) for l in lines_filtered ] )
freq_tot = float(freqs_raw.sum())
# Frequency weighting adjustments
freqs = freqs_raw / freq_tot
cutoff_index = 30 # All words with highter frequencies will be 'limited' at this level
freqs[0:cutoff_index] = freqs[cutoff_index]
freqs = freqs / freqs.sum()
freqs[0:50]
In [ ]:
test_cum = np.array( [.1, .5, .9, 1.0] )
test_cum.searchsorted([ .05, 0.45, .9, .95])
In [ ]:
# Cumulative frequency, so that we can efficiently pick weighted random words...
# using http://docs.scipy.org/doc/numpy/reference/generated/numpy.searchsorted.html
freqs_cum = freqs.cumsum()
freqs_cum[:10], freqs_cum[-10:],
In [ ]:
CHARS_VALID = "abcdefghijklmnopqrstuvwxyz- "
CHARS_SIZE = len(CHARS_VALID)
CHAR_TO_IX = {c: i for i, c in enumerate(CHARS_VALID)}
IX_TO_CHAR = {i: c for i, c in enumerate(CHARS_VALID)}
CHAR_TO_ONEHOT = {c: np.eye(CHARS_SIZE)[i] for i, c in enumerate(CHARS_VALID)}
#CHAR_TO_IX
In [ ]:
# Single letter frequencies
unigram_freq = np.zeros( (CHARS_SIZE,))
idx_end = CHAR_TO_IX[' ']
for i,w in enumerate(words):
word_freq = freqs[i]
for c in w:
unigram_freq[ CHAR_TO_IX[c] ] += word_freq
unigram_freq[ idx_end ] += word_freq
unigram_freq /= unigram_freq.sum()
unigram_freq_cum = unigram_freq.cumsum()
[ (CHARS_VALID[i], "%6.3f" % f) for i,f in enumerate(unigram_freq.tolist()) ]
#CHARS_VALID[ unigram_freq_cum.searchsorted(0.20) ]
In [ ]:
def unigram_word():
s=[]
while True:
idx = np.searchsorted(unigram_freq_cum, np.random.uniform())
c = IX_TO_CHAR[idx]
if c==' ':
if len(s)>0:
break
else:
continue
s.append(c)
return ''.join(s)
' '.join([ unigram_word() for i in range(0,20) ])
In [ ]:
# two-letter frequencies
bigram_freq = np.zeros( (CHARS_SIZE,CHARS_SIZE) )
for i,w in enumerate(words):
w2 = ' '+w+' '
word_freq = freqs[i]
for j in range(0, len(w2)-1):
bigram_freq[ CHAR_TO_IX[ w2[j] ], CHAR_TO_IX[ w2[j+1] ] ] += word_freq
#[ (CHARS_VALID[i], "%6.3f" % f) for i,f in enumerate(bigram_freq[ CHAR_TO_IX['q'] ].tolist()) ]
#bigram_freq.sum(axis=1)[CHAR_TO_IX['q']]
bigram_freq /= bigram_freq.sum(axis=1)[:, np.newaxis] # Trick to enable unflattening of sum()
bigram_freq_cum = bigram_freq.cumsum(axis=1)
#[ (CHARS_VALID[i], "%6.3f" % f) for i,f in enumerate(bigram_freq_cum[ CHAR_TO_IX['q'] ].tolist()) ]
In [ ]:
#bigram_freq.sum(axis=1)[CHAR_TO_IX['q']]
#(bigram_freq/ bigram_freq.sum(axis=1)).sum(axis=0)
#bigram_freq.sum(axis=1)[CHAR_TO_IX['q']]
#bigram_freq[CHAR_TO_IX['q'], :].sum()
#(bigram_freq / bigram_freq.sum(axis=1)[:, np.newaxis]).cumsum(axis=1)
#Letter relative frequency for letters following 'q'
[ (CHARS_VALID[i], "%6.3f" % f) for i,f in enumerate(bigram_freq[ CHAR_TO_IX['q'] ].tolist()) if f>0.001]
#bigram_freq_cum[4]
In [ ]:
def bigram_word():
s=[]
idx_last = CHAR_TO_IX[' ']
while True:
idx = np.searchsorted(bigram_freq_cum[idx_last], np.random.uniform())
c = IX_TO_CHAR[idx]
if c==' ':
if len(s)>0:
#if len(s)<50: continue
break
else:
continue
s.append(c)
idx_last=idx
return ''.join(s)
' '.join([ bigram_word() for i in range(0,20) ])
In [ ]:
# Three-letter frequencies
trigram_freq = np.zeros( (CHARS_SIZE,CHARS_SIZE,CHARS_SIZE) )
for i,w in enumerate(words):
w3 = ' '+w+' '
word_freq = freqs[i]
for j in range(0, len(w3)-2):
trigram_freq[ CHAR_TO_IX[ w3[j] ], CHAR_TO_IX[ w3[j+1] ], CHAR_TO_IX[ w3[j+2] ] ] += word_freq
trigram_freq /= trigram_freq.sum(axis=2)[:, :, np.newaxis] # Trick to enable unflattening of sum()
trigram_freq_cum = trigram_freq.cumsum(axis=2)
[ "ex-%s %6.3f" % (CHARS_VALID[i], f)
for i,f in enumerate(trigram_freq[ CHAR_TO_IX['e'], CHAR_TO_IX['x'] ].tolist()) if f>0.001 ]
In [ ]:
def trigram_word():
s=[]
idx_1 = idx_2 = CHAR_TO_IX[' ']
while True:
idx = np.searchsorted(trigram_freq_cum[idx_1, idx_2], np.random.uniform())
c = IX_TO_CHAR[idx]
if c==' ':
if len(s)>0:
#if len(s)<50: continue
break
else:
continue
s.append(c)
idx_1, idx_2 = idx_2, idx
return ''.join(s)
' '.join([ trigram_word() for i in range(0,20) ])
In [ ]:
sample_size=10000
ngram_hits = [0,0,0]
for w in [ unigram_word() for i in range(0, sample_size) ]:
if w in wordset: ngram_hits[0] += 1
#print("%s %s" % (("YES" if w in wordset else " - "), w, ))
for w in [ bigram_word() for i in range(0, sample_size) ]:
if w in wordset: ngram_hits[1] += 1
#print("%s %s" % (("YES" if w in wordset else " - "), w, ))
for w in [ trigram_word() for i in range(0, sample_size) ]:
if w in wordset: ngram_hits[2] += 1
#print("%s %s" % (("YES" if w in wordset else " - "), w, ))
for i,hits in enumerate(ngram_hits):
print("%d-gram : %4.2f%%" % (i+1, hits*100./sample_size ))
#[ (i,w) for i,w in enumerate(words) if 'mq' in w]
In [ ]:
# Find the distribution of unigrams by sampling (sanity check)
if False:
sample_size=1000
arr=[]
for w in [ unigram_word() for i in range(0, sample_size) ]:
arr.append(w)
s = ' '.join(arr)
s_len = len(s)
for c in CHARS_VALID:
f = len(s.split(c))-1
print("%s -> %6.3f%%" % (c, f*100./s_len))
In [ ]:
BATCH_SIZE = 64
RNN_HIDDEN_SIZE = CHARS_SIZE
GRAD_CLIP_BOUND = 5.0
Instead of having a binary 'YES/NO' decision about whether a word is valid (via a lookup in the vocabulary), it may make it simpler to train a word-generator if we can assign a probability that a given word is valid.
To do this, let's create a recurrent neural network (RNN) that accepts a (one-hot-encoded) word as input, and (at the end of the sequence) gives us an estimate of the probability that the word is valid.
Actually, rather than descriminate according to whether the word is actually valid, let's 'just' try to decide whether it was produced directly from the dictionary or from the generate_bigram_word()
source.
This can be tested by giving it lists of actual words, and lists of words generated by generate_bigram_word()
and seeing whether they can be correctly classified.
The decision about what to do in the 12% of cases when the bigram function results in a valid word can be left until later... (since the distribution is so heavily skewed towards producing non-words).
And a 'batch generator' function that delivers data in the right format for RNN training
In [ ]:
def batch_dictionary(size=BATCH_SIZE/2):
uniform_vars = np.random.uniform( size=(size,) )
idx = freqs_cum.searchsorted(uniform_vars)
return wordsnp[ idx ].tolist()
def batch_bigram(size=BATCH_SIZE/2):
return [ bigram_word()[0:WORD_LENGTH_MAX] for i in range(size) ]
In [ ]:
# Test it out
#batch_test = lambda : batch_dictionary(size=4)
batch_test = lambda : batch_bigram(size=4)
print(batch_test())
print(batch_test())
print(batch_test())
In [ ]:
# After sampling a data batch, we transform it into a one hot feature representation with a mask
def prep_batch_for_network(batch_of_words):
word_max_length = np.array( [ len(w) for w in batch_of_words ]).max()
# translate into one-hot matrix, mask values and targets
input_values = np.zeros((len(batch_of_words), word_max_length, CHARS_SIZE), dtype='float32')
mask_values = np.zeros((len(batch_of_words), word_max_length), dtype='int32')
for i, word in enumerate(batch_of_words):
for j, c in enumerate(word):
input_values[i,j] = CHAR_TO_ONEHOT[ c ]
mask_values[i, 0:len(word) ] = 1
return input_values, mask_values
In [ ]:
# Symbolic variables for input. In addition to the usual features and target,
# we need initial values for the RNN layer's hidden states
disc_input_sym = theano.tensor.tensor3()
disc_mask_sym = theano.tensor.imatrix()
disc_target_sym = theano.tensor.matrix() # probabilities of being from the dictionary (i.e. a single column matrix)
In [ ]:
# Our network has two stacked GRU layers processing the input sequence.
disc_input = lasagne.layers.InputLayer( (None, None, CHARS_SIZE) ) # batch_size, sequence_len, chars_size
disc_mask = lasagne.layers.InputLayer( (None, None, CHARS_SIZE) ) # batch_size, sequence_len, chars_size
disc_rnn1 = lasagne.layers.GRULayer(disc_input,
num_units=RNN_HIDDEN_SIZE,
gradient_steps=-1,
grad_clipping=GRAD_CLIP_BOUND,
hid_init=lasagne.init.Normal(),
learn_init=True,
mask_input=disc_mask,
only_return_final=True, # Only the state at the last timestep is needed
)
disc_decoder = lasagne.layers.DenseLayer(disc_rnn1,
num_units=1,
nonlinearity=lasagne.nonlinearities.sigmoid
)
disc_final = disc_decoder
In [ ]:
# Finally, the output stage
disc_output = lasagne.layers.get_output(disc_final, {
disc_input: disc_input_sym,
disc_mask: disc_mask_sym,
}
)
In [ ]:
disc_loss = theano.tensor.nnet.binary_crossentropy(disc_output, disc_target_sym).mean()
In [ ]:
# For stability during training, gradients are clipped and a total gradient norm constraint is also used
#MAX_GRAD_NORM = 15
disc_params = lasagne.layers.get_all_params(disc_final, trainable=True)
disc_grads = theano.tensor.grad(disc_loss, disc_params)
#disc_grads = [theano.tensor.clip(g, -GRAD_CLIP_BOUND, GRAD_CLIP_BOUND) for g in disc_grads]
#disc_grads, disc_norm = lasagne.updates.total_norm_constraint( disc_grads, MAX_GRAD_NORM, return_norm=True)
disc_updates = lasagne.updates.adam(disc_grads, disc_params)
disc_train = theano.function([disc_input_sym, disc_target_sym, disc_mask_sym], # , disc_rnn1_t0_sym
[disc_loss], # , disc_output, norm, hid_out_last, hid2_out_last
updates=disc_updates,
)
disc_predict = theano.function([disc_input_sym, disc_mask_sym], [disc_output])
print("Discriminator network functions defined")
In [ ]:
t0, iterations_complete = time.time(), 0
In [ ]:
epochs = 10*1000
t1, iterations_recent = time.time(), iterations_complete
for epoch_i in range(epochs):
# create a batch of words : half are dictionary, half are from bigram
batch_of_words = batch_dictionary() + batch_bigram()
# get the one-hot input values and corresponding mask matrix
disc_input_values, disc_mask_values = prep_batch_for_network(batch_of_words)
# and here are the assocated target values
disc_target_values= np.zeros((len(batch_of_words),1), dtype='float32')
disc_target_values[ 0:(BATCH_SIZE/2), 0 ] = 1.0 # First half are dictionary values
for i, word in enumerate(batch_of_words):
if True and i>BATCH_SIZE/2 and word in wordset:
disc_target_values[ i , 0 ] = 1.0 # bigram has hit a dictionary word by luck...
# Now train the discriminator RNN
disc_loss_, = disc_train(disc_input_values, disc_target_values, disc_mask_values)
#disc_output_, = disc_predict(disc_input_values, disc_mask_values)
iterations_complete += 1
if iterations_complete % 250 == 0:
secs_per_batch = float(time.time() - t1)/ (iterations_complete - iterations_recent)
eta_in_secs = secs_per_batch*(epochs-epoch_i)
print("Iteration {:5d}, loss_train: {:.4f} ({:.1f}s per 1000 batches) eta: {:.0f}m{:02.0f}s".format(
iterations_complete, float(disc_loss_),
secs_per_batch*1000., np.floor(eta_in_secs/60), np.floor(eta_in_secs % 60)
))
#print('Iteration {}, output: {}'.format(iteration, disc_output_, )) # , output: {}
t1, iterations_recent = time.time(), iterations_complete
print('Iteration {}, ran in {:.1f}sec'.format(iterations_complete, float(time.time() - t0)))
In [ ]:
disc_param_values = lasagne.layers.get_all_param_values(disc_final)
disc_param_dictionary = dict(
params = disc_param_values,
CHARS_VALID = CHARS_VALID,
CHAR_TO_IX = CHAR_TO_IX,
IX_TO_CHAR = IX_TO_CHAR,
)
#pickle.dump(disc_param_dictionary, open('../data/RNN/disc_trained.pkl','w'), protocol=pickle.HIGHEST_PROTOCOL)
In [ ]:
disc_param_dictionary = pickle.load(open('../data/RNN/disc_trained_64x310k.pkl', 'r'))
lasagne.layers.set_all_param_values(disc_final, disc_param_dictionary['params'])
In [ ]:
test_text_list = ["shape", "shast", "shaes", "shafg", "shaqw"]
test_text_list = ["opposite", "aposite", "apposite", "xposite", "rrwqsite", "deposit", "idilic", "idyllic"]
In [ ]:
disc_input_values, disc_mask_values = prep_batch_for_network(test_text_list)
disc_output_, = disc_predict(disc_input_values, disc_mask_values)
for i,v in enumerate(disc_output_.tolist()):
print("%s : %5.2f%%" % ((test_text_list[i]+' '*20)[:20], v[0]*100.))
Next, let's build an RNN that produces text, and train it using (a) a pure dictionary look-up, and (b) the correctness signal from the Discriminator above.
Plan of attack :
This seems very inefficient (since the first RNN steps are being run multiple times on the same starting letters), but is the same as in https://github.com/Lasagne/Recipes/blob/master/examples/lstm_text_generation.py
In [ ]:
# Let's pre-calculate the logs of the bigram frequencies, since they may be mixed in below
bigram_min_freq = 1e-10 # To prevent underflow in log...
bigram_freq_log = np.log( bigram_freq + bigram_min_freq ).astype('float32')
In [ ]:
# Symbolic variables for input. In addition to the usual features and target,
gen_input_sym = theano.tensor.ftensor3()
gen_mask_sym = theano.tensor.imatrix()
gen_words_target_sym = theano.tensor.imatrix() # characters generated (as character indicies)
# probabilities of being from the dictionary (i.e. a single column matrix)
gen_valid_target_sym = theano.tensor.fmatrix( )
# This is a single mixing parameter (0.0 = pure RNN, 1.0=pure Bigram)
gen_bigram_overlay = theano.tensor.fscalar()
# This is 'current' since it reflects the bigram field as far as it is known during the call
gen_bigram_freq_log_field = theano.tensor.ftensor3()
In [ ]:
gen_input = lasagne.layers.InputLayer( (None, None, CHARS_SIZE) ) # batch_size, sequence_len, chars_size
gen_mask = lasagne.layers.InputLayer( (None, None, CHARS_SIZE) ) # batch_size, sequence_len, chars_size
#gen_rnn1_t0 = lasagne.layers.InputLayer( (None, RNN_HIDDEN_SIZE) ) # batch_size, RNN_hidden_size=chars_size
#n_batch, n_time_steps, n_features = gen_input.input_var.shape
n_batch, n_time_steps, n_features = gen_input_sym.shape
gen_rnn1 = lasagne.layers.GRULayer(gen_input,
num_units=RNN_HIDDEN_SIZE,
gradient_steps=-1,
grad_clipping=GRAD_CLIP_BOUND,
#hid_init=disc_rnn1_t0,
hid_init=lasagne.init.Normal(),
learn_init=True,
mask_input=gen_mask,
only_return_final=False, # Need all of the output states
)
# Before the decoder layer, we need to reshape the sequence into the batch dimension,
# so that timesteps are decoded independently.
gen_reshape = lasagne.layers.ReshapeLayer(gen_rnn1, (-1, RNN_HIDDEN_SIZE) )
gen_prob_raw = lasagne.layers.DenseLayer(gen_reshape,
num_units=CHARS_SIZE,
nonlinearity=lasagne.nonlinearities.linear # No squashing (yet)
)
gen_prob = lasagne.layers.ReshapeLayer(gen_prob_raw, (-1, n_time_steps, CHARS_SIZE))
gen_prob_theano = lasagne.layers.get_output(gen_prob, {
gen_input: gen_input_sym,
gen_mask: gen_mask_sym,
})
gen_prob_mix = gen_bigram_overlay*gen_bigram_freq_log_field + (1.0-gen_bigram_overlay)*gen_prob_theano
gen_prob_mix_flattened = theano.tensor.reshape(gen_prob_mix, (-1, CHARS_SIZE))
gen_prob_softmax_flattened = theano.tensor.nnet.nnet.softmax(gen_prob_mix_flattened)
#gen_prob_final = lasagne.layers.SliceLayer(gen_prob_raw, indices=(-1), axis=1)
In [ ]:
# Finally, the output stage - this is for the training (over all the letters in the words)
#gen_output = gen_prob_softmax_flattened
# And for prediction (which is done incrementally, adding one letter at a time)
gen_output_last = gen_prob_softmax_flattened.reshape( (-1, n_time_steps, CHARS_SIZE) )[:, -1]
In [ ]:
# The generative network is trained by encouraging the outputs across time to match the given sequence of letters
# We flatten the sequence into the batch dimension before calculating the loss
#def gen_word_cross_ent(net_output, targets):
# preds_raw = theano.tensor.reshape(net_output, (-1, CHARS_SIZE))
# preds_softmax = theano.tensor.nnet.nnet.softmax(preds_raw)
# targets_flat = theano.tensor.flatten(targets)
# cost = theano.tensor.nnet.categorical_crossentropy(preds_softmax, targets_flat)
# return cost
targets_flat = theano.tensor.flatten(gen_words_target_sym)
gen_cross_entropy_flat = theano.tensor.nnet.categorical_crossentropy(gen_prob_softmax_flattened, targets_flat)
gen_cross_entropy = theano.tensor.reshape(gen_cross_entropy_flat, (-1, n_time_steps) )
gen_loss_weighted = theano.tensor.dot( gen_valid_target_sym.T, gen_cross_entropy )
gen_loss = gen_loss_weighted.mean()
In [ ]:
# For stability during training, gradients are clipped and a total gradient norm constraint is also used
#MAX_GRAD_NORM = 15
gen_predict = theano.function([gen_input_sym,
gen_bigram_overlay, gen_bigram_freq_log_field,
gen_mask_sym], [gen_output_last])
gen_params = lasagne.layers.get_all_params(gen_prob, trainable=True)
gen_grads = theano.tensor.grad(gen_loss, gen_params)
#gen_grads = [theano.tensor.clip(g, -GRAD_CLIP_BOUND, GRAD_CLIP_BOUND) for g in gen_grads]
#gen_grads, gen_norm = lasagne.updates.total_norm_constraint( gen_grads, MAX_GRAD_NORM, return_norm=True)
gen_updates = lasagne.updates.adam(gen_grads, gen_params)
gen_train = theano.function([gen_input_sym,
gen_bigram_overlay, gen_bigram_freq_log_field,
gen_words_target_sym, gen_valid_target_sym,
gen_mask_sym],
[gen_loss],
updates=gen_updates,
)
gen_debug = theano.function([gen_input_sym,
gen_bigram_overlay, gen_bigram_freq_log_field,
gen_words_target_sym, gen_valid_target_sym,
gen_mask_sym],
[gen_cross_entropy],
on_unused_input='ignore'
)
print("Generator network functions defined")
The network above can be used to generate text...
The following set-up allows for the output of the RNN at each timestep to be mixed with the letter frequency that the bigram model would suggest - in a proportion bigram_overlay
which can vary from 0
(being solely RNN derived) to 1.0
(being solely bigram frequencies, with the RNN output being disregarded).
The input is a 'random field' matrix that is used to chose each letter in each slot from the generated probability distribution.
Once a space is output for a specific word, then it stops being extended (equivalently, the mask is set to zero going forwards).
Once spaces have been observed for all words (or the maximum length reached), the process ends, and a list of the created words is returned.
In [ ]:
def generate_rnn_words(random_field, bigram_overlay=0.0):
batch_size, max_word_length = random_field.shape
idx_spc = CHAR_TO_IX[' ']
def append_indices_as_chars(words_current, idx_list):
for i, idx in enumerate(idx_list):
if idx == idx_spc:
pass # Words end at space
#words_current[i] += 'x'
else:
words_current[i] += IX_TO_CHAR[idx]
return words_current
# Create a 'first character' by using the bigram transitions from 'space' (this is fair)
idx_initial = [ np.searchsorted(bigram_freq_cum[idx_spc], random_field[i, 0]) for i in range(batch_size) ]
bigram_freq_log_field = np.zeros( (batch_size, max_word_length, CHARS_SIZE), dtype='float32')
bigram_freq_log_field[:,0] = bigram_freq_log[ np.array(idx_initial) , :]
words_current = [ '' for _ in range(batch_size) ]
words_current = append_indices_as_chars(words_current, idx_initial)
col = 1
while True:
gen_input_values, gen_mask_values = prep_batch_for_network(words_current)
#print(gen_mask_values[:,-1])
#gen_out_, = gen_predict(gen_input_values, gen_mask_values)
if gen_input_values.shape[1]<col: # Early termination
print("Early termination")
col -= 1
break
#print(gen_input_values.shape, gen_mask_values.shape, bigram_freq_log_field.shape, col)
probs, = gen_predict(gen_input_values, bigram_overlay, bigram_freq_log_field[:,0:col], gen_mask_values)
#print(probs[0])
# This output is the final probability[CHARS_SIZE], so let's cumsum it, etc.
probs_cum = probs.cumsum(axis=1)
idx_next = [ # Only add extra letters if we haven't already passed a space (i.e. mask[-1]==0)
idx_spc if gen_mask_values[i,-1]==0 else np.searchsorted(probs_cum[i], random_field[i, col])
for i in range(batch_size)
]
words_current = append_indices_as_chars(words_current, idx_next)
words_current_max_length = np.array( [ len(w) for w in words_current ]).max()
# If the words have reached the maximum length, or we didn't extend any of them...
if words_current_max_length>=max_word_length: # Finished
col += 1
break
# Guarded against overflow on length...
bigram_freq_log_field[:, col] = bigram_freq_log[ np.array(idx_next) , :]
col += 1
return words_current, bigram_freq_log_field[:,0:col]
def view_rnn_generator_sample_output(bigram_overlay=0.9):
# Create a probability distribution across all potential positions in the output 'field'
random_field = np.random.uniform( size=(BATCH_SIZE, WORD_LENGTH_MAX) )
gen_words_output, _underlying_bigram_field = generate_rnn_words(random_field, bigram_overlay=bigram_overlay)
print( '\n'.join(gen_words_output))
#print(_underlying_bigram_field)
In [ ]:
view_rnn_generator_sample_output(bigram_overlay=0.0)
In [ ]:
view_rnn_generator_sample_output(bigram_overlay=0.9)
In [ ]:
gen_param_values_initial = lasagne.layers.get_all_param_values(gen_prob)
In [ ]:
def is_good_output_dictionary(output_words):
return np.array(
[ (1.0 if w in wordset else 0.0) for w in output_words ],
dtype='float32'
)
t0, iterations_complete = time.time(), 0
def reset_generative_network():
global t0, iterations_complete
t0, iterations_complete = time.time(), 0
lasagne.layers.set_all_param_values(gen_prob, gen_param_values_initial)
def prep_batch_for_network_output(mask_values, batch_of_words):
output_indices = np.zeros(mask_values.shape, dtype='int32')
for i, word in enumerate(batch_of_words):
word_shifted = word[1:]+' '
for j, c in enumerate(word_shifted):
output_indices[i,j] = CHAR_TO_IX[ c ]
return output_indices
In [ ]:
def train_generative_network(is_good_output_function=is_good_output_dictionary, epochs=10*1000, bigram_overlay=0.0):
if bigram_overlay>=1.0:
print("Cannot train with pure bigrams...")
return
global t0, iterations_complete
t1, iterations_recent = time.time(), iterations_complete
for epoch_i in range(epochs):
random_field = np.random.uniform( size=(BATCH_SIZE, WORD_LENGTH_MAX) )
gen_words_output, underlying_bigram_field = generate_rnn_words(random_field, bigram_overlay=bigram_overlay)
#print(gen_words_output[0])
#print(underlying_bigram_field[0])
# Now, create a training set of input -> output, coupled with an intensity signal
# first the step-by-step network inputs
gen_input_values, gen_mask_values = prep_batch_for_network(gen_words_output)
# now create step-by-step network outputs (strip off first character, add spaces) as *indicies*
gen_output_values_int = prep_batch_for_network_output(gen_mask_values, gen_words_output)
#print(gen_output_values_int.shape, underlying_bigram_field.shape)
#print(gen_output_values_int[0]) # makes sense
# And, since we have a set of words, we can also determine their 'goodness'
is_good_output = is_good_output_function(gen_words_output)
#print(is_good_output[0]) Starts at all zero. i.e. the word[0] is bad
# This looks like it is the wrong way 'round...
target_valid_row = -(np.array(is_good_output) - 0.5)
## i.e. higher values for more-correct symbols : This goes -ve, and wrong, quickly
#target_valid_row = (np.array(is_good_output) - 0.5)
#target_valid_row = np.ones( (gen_mask_values.shape[0],), dtype='float32' )
target_valid = target_valid_row[:, np.newaxis]
#print(target_valid.shape)
if False:
# Now debug the generator RNN
gen_debug_, = gen_debug(gen_input_values,
bigram_overlay, underlying_bigram_field,
gen_output_values_int, target_valid,
gen_mask_values)
print(gen_debug_.shape)
print(gen_debug_[0])
#return
# Now train the generator RNN
gen_loss_, = gen_train( gen_input_values,
bigram_overlay, underlying_bigram_field,
gen_output_values_int, target_valid,
gen_mask_values)
#print(gen_loss_)
# Hmm - this loss is ~ a character-level loss, and isn't comparable to a word-level score,
# which is a pity, since the 'words' seem to get worse, not better...
iterations_complete += 1
if iterations_complete % 10 == 0:
secs_per_batch = float(time.time() - t1)/ (iterations_complete - iterations_recent)
eta_in_secs = secs_per_batch*(epochs-epoch_i)
print("Iteration {:5d}, loss_train: {:.2f} word-score: {:.2f}% ({:.1f}s per 1000 batches) eta: {:.0f}m{:02.0f}s".format(
iterations_complete, float(gen_loss_),
float(is_good_output.mean())*100.,
secs_per_batch*1000., np.floor(eta_in_secs/60), np.floor(eta_in_secs % 60), )
)
print( ' '.join(gen_words_output[:10]) )
#print('Iteration {}, output: {}'.format(iteration, disc_output_, )) # , output: {}
t1, iterations_recent = time.time(), iterations_complete
print('Iteration {}, ran in {:.1f}sec'.format(iterations_complete, float(time.time() - t0)))
In [ ]:
#theano.config.exception_verbosity='high' # ... a little pointless with RNNs
# See: http://deeplearning.net/software/theano/tutorial/debug_faq.html
reset_generative_network()
In [ ]:
train_generative_network(is_good_output_function=is_good_output_dictionary, epochs=1*1000, bigram_overlay=0.9)
In [ ]:
view_rnn_generator_sample_output(bigram_overlay=0.9)
In [ ]:
#def is_good_output_dictionary(output_words):
# return np.array(
# [ (1.0 if w in wordset else 0.0) for w in output_words ],
# dtype='float32'
# )
def is_good_output_discriminator(output_words):
disc_input_values, disc_mask_values = prep_batch_for_network(output_words)
disc_output_, = disc_predict(disc_input_values, disc_mask_values)
return disc_output_.reshape( (-1,) )
In [ ]:
reset_generative_network()
train_generative_network(is_good_output_function=is_good_output_discriminator, epochs=1*1000, bigram_overlay=0.9)
#train_generative_network(is_good_output_function=is_good_output_dictionary, epochs=1*1000, bigram_overlay=0.9)
In [ ]:
view_rnn_generator_sample_output(bigram_overlay=0.9)
In [ ]:
In [ ]:
In [ ]:
In [ ]: