Corpus: The Critique of Pure Reason by Immanuel Kant http://www.gutenberg.org/files/4280/4280.txt
In [ ]:
lines = []
with open("corpera/kant.txt", 'r') as f:
for line in f:
line = line.strip().lower()
line = line.decode("ascii", "ignore")
if (len(line)==0):
continue
lines.append(line)
In [ ]:
lines = lines[17:] # skip the first few lines of the file which are not part of the book
In [ ]:
text = " ".join(lines) # turns the book into a stream of characters
In [ ]:
text[:100]
In [ ]:
len(text)
Turn text into stream of characters and create lookup tables
In [ ]:
discard_chars = set([c for c in '01234567890%=@$&+*[]<>'])
In [ ]:
text = [c for c in text if c not in discard_chars]
In [ ]:
len(text)
In [ ]:
chars = set(text)
nb_chars = len(chars)
char2index = dict((c,i) for i,c in enumerate(chars))
index2char = dict((i,c) for i,c in enumerate(chars))
In [ ]:
nb_chars
Construct input/output data
In [ ]:
SEQLEN = 20
STEP = 1
In [ ]:
input_seqs = []
target_chars = []
In [ ]:
for i in range(0, len(text) - SEQLEN, STEP):
input_seqs.append(text[i: i+SEQLEN])
target_chars.append(text[i+SEQLEN])
In [ ]:
len(input_seqs), len(target_chars)
In [ ]:
input_seqs[300000], target_chars[300000]
vectorize inputs
In [ ]:
import numpy as np
In [ ]:
X = np.zeros((len(input_seqs), SEQLEN, nb_chars), dtype=np.bool)
In [ ]:
X.shape
In [ ]:
Y = np.zeros((len(input_seqs), nb_chars), dtype=np.bool)
In [ ]:
Y.shape
An individual input will be a one-hot encoded sequence of 20 characters
In [ ]:
X[0].shape
In [ ]:
for i, input_char in enumerate(input_seqs):
for j, ch in enumerate(input_char):
X[i, j, char2index[ch]] = 1
Y[i, char2index[target_chars[i]]] = 1
In [ ]:
X[0]
In [ ]:
Y[0]
Create Model
In [ ]:
from keras.models import Sequential
from keras.layers.core import Dense, Activation
from keras.layers.recurrent import SimpleRNN
In [ ]:
rnn_model = Sequential()
rnn_model.add(SimpleRNN(units=100, return_sequences=False, unroll=True,
input_shape=(SEQLEN, nb_chars)))
rnn_model.add(Dense(nb_chars))
rnn_model.add(Activation("softmax"))
rnn_model.summary()
Exercise: replace the SimpleRNN with an LSTM or GRU. Any difference in model quality or training times?
In [ ]:
rnn_model.compile(loss="categorical_crossentropy", optimizer="adam")
Training/testing procedure
In [ ]:
len(input_seqs)
In [ ]:
for i in range(25):
print
print "----------- Iteration number {} --------------".format(i)
# train
history = rnn_model.fit(X,Y, batch_size=128, epochs=1, validation_split=0.1)
# generate.
# get a random input:
test_idx = np.random.randint(len(input_seqs))
test_chars = input_seqs[test_idx]
print "generating from: {}/".format(''.join(test_chars)),
for i in range(100): # characters to generate after seed
Xtest = np.zeros((1, SEQLEN, nb_chars))
for i, ch in enumerate(test_chars):
Xtest[0, i, char2index[ch]] = 1
pred = rnn_model.predict(Xtest, verbose=0)[0] # returns the whole softmaxed array
ypred = index2char[np.argmax(pred)] # take the highest probability
print ypred,
test_chars = test_chars[1:] + [ypred] # construct next input sequence
Exercise: Try and fix the repetition problem. Instead of choosing the character with maximal probability after each call to predict, sample from the distribution that predict returns.
A neural network which takes a sequence as input and outputs a sequence (which may be of variable length).
The original seq2seq paper: http://papers.nips.cc/paper/5346-sequence-to-sequence-learning-with-neural-networks.pdf
In [7]:
import numpy as np
In [8]:
TRAINING_SIZE = 100000
DIGITS = 3
INVERT = True
# Maximum length of input is 'int + int' (e.g., '345+678'). Maximum length of
# int is DIGITS.
MAXLEN = DIGITS + 1 + DIGITS
In [9]:
class CharacterTable(object):
"""Given a set of characters:
+ Encode them to a one hot integer representation
+ Decode the one hot integer representation to their character output
+ Decode a vector of probabilities to their character output
"""
def __init__(self, chars):
"""Initialize character table.
# Arguments
chars: Characters that can appear in the input.
"""
self.chars = sorted(set(chars))
self.char_indices = dict((c, i) for i, c in enumerate(self.chars))
self.indices_char = dict((i, c) for i, c in enumerate(self.chars))
def encode(self, C, num_rows):
"""One hot encode given string C.
# Arguments
num_rows: Number of rows in the returned one hot encoding. This is
used to keep the # of rows for each data the same.
"""
x = np.zeros((num_rows, len(self.chars)))
for i, c in enumerate(C):
x[i, self.char_indices[c]] = 1
return x
def decode(self, x, calc_argmax=True):
if calc_argmax:
x = x.argmax(axis=-1)
return ''.join(self.indices_char[x] for x in x)
class colors:
ok = '\033[92m'
fail = '\033[91m'
close = '\033[0m'
In [10]:
# All the numbers, plus sign and space for padding.
chars = '0123456789+ '
ctable = CharacterTable(chars)
questions = [] # will hold the addition problems
expected = [] # will hold the answer to each addition problems
seen = set()
In [11]:
print 'Generating data...'
while len(questions) < TRAINING_SIZE:
f = lambda: int(''.join(np.random.choice(list('0123456789'))
for i in range(np.random.randint(1, DIGITS + 1))))
a, b = f(), f()
# Skip any addition questions we've already seen
# Also skip any such that x+Y == Y+x (hence the sorting).
key = tuple(sorted((a, b)))
if key in seen:
continue
seen.add(key)
# Pad the data with spaces such that it is always MAXLEN.
q = '{}+{}'.format(a, b)
query = q + ' ' * (MAXLEN - len(q))
ans = str(a + b)
# Answers can be of maximum size DIGITS + 1.
ans += ' ' * (DIGITS + 1 - len(ans))
if INVERT:
# Reverse the query, e.g., '12+345 ' becomes ' 543+21'. (Note the
# space used for padding.)
query = query[::-1]
questions.append(query)
expected.append(ans)
print 'Total addition questions:', len(questions)
In [12]:
questions[42]
Out[12]:
In [13]:
x = np.zeros((len(questions), MAXLEN, len(chars)), dtype=np.bool)
y = np.zeros((len(questions), DIGITS + 1, len(chars)), dtype=np.bool)
for i, sentence in enumerate(questions):
x[i] = ctable.encode(sentence, MAXLEN)
for i, sentence in enumerate(expected):
y[i] = ctable.encode(sentence, DIGITS + 1)
In [14]:
x.shape # samples, input sequence length, vocabulary length
Out[14]:
In [15]:
x[0]
Out[15]:
In [16]:
y.shape
Out[16]:
In [17]:
y[0]
Out[17]:
In [18]:
indices = np.arange(len(y))
np.random.shuffle(indices)
x = x[indices]
y = y[indices]
In [ ]:
from keras import layers
from keras.models import Sequential
In [ ]:
RNN = layers.LSTM
HIDDEN_SIZE = 128
BATCH_SIZE = 128
LAYERS = 1
In [22]:
s2s_model = Sequential()
# "Encode" the input sequence using an RNN, producing an output of HIDDEN_SIZE.
# Note: In a situation where your input sequences have a variable length,
# use input_shape=(None, num_feature).
s2s_model.add(RNN(HIDDEN_SIZE, input_shape=(MAXLEN, len(chars))))
# As the decoder RNN's input, repeatedly provide with the last hidden state of
# RNN for each time step. Repeat 'DIGITS + 1' times as that's the maximum
# length of output, e.g., when DIGITS=3, max output is 999+999=1998.
s2s_model.add(layers.RepeatVector(DIGITS + 1))
# The decoder RNN could be multiple layers stacked or a single layer.
for _ in range(LAYERS):
# By setting return_sequences to True, return not only the last output but
# all the outputs so far in the form of (num_samples, timesteps,
# output_dim). This is necessary as TimeDistributed in the below expects
# the first dimension to be the timesteps.
s2s_model.add(RNN(HIDDEN_SIZE, return_sequences=True))
# Apply a dense layer to the every temporal slice of an input. For each of step
# of the output sequence, decide which character should be chosen.
s2s_model.add(layers.TimeDistributed(layers.Dense(len(chars))))
s2s_model.add(layers.Activation('softmax'))
s2s_model.compile(loss='categorical_crossentropy',
optimizer='adam',
metrics=['accuracy'])
s2s_model.summary()
In [24]:
for iteration in range(1, 2): ## increase from 2 --> 200 to get a fully-trained model
print
print '-' * 50
print 'Iteration', iteration
s2s_model.fit(x, y,
batch_size=BATCH_SIZE,
epochs=1,
validation_split=0.1)
# Select 10 samples from the validation set at random so we can visualize
# errors.
for i in range(10):
ind = np.random.randint(0, len(x))
rowx, rowy = x[np.array([ind])], y[np.array([ind])]
preds = s2s_model.predict_classes(rowx, verbose=0)
q = ctable.decode(rowx[0])
correct = ctable.decode(rowy[0])
guess = ctable.decode(preds[0], calc_argmax=False)
print 'Q', q[::-1] if INVERT else q
print 'T', correct
if correct == guess:
print colors.ok + '☑' + colors.close + " ",
else:
print colors.fail + '☒' + colors.close + " ",
print guess
print '---'
Exercise: implement a convolutional seq2seq algorithm! See: https://arxiv.org/abs/1705.03122 and https://github.com/facebookresearch/fairseq
Suppose you have a large collection of short texts, Tweets, titles, etc., and you'd like to cluster them for purposes of characterization, classification, or anomaly detection. If you can first represent each text as a dense vector, you can then apply clustering and visualization algorithms.
Auto-encoding (transforming the text into something else, then back into itself) is one way to create a dense vector that represents a bit of text. At the end of the encoding process, place a dense layer of the desired dimension. You'll be able to extract these dense vectors after training.
It's not difficult to turn any seq2seq model into an auto-encoder. Notice that a seq2seq model has an encoder stage followed by a decoder stage. In the addition example, the encoder stage is simply the first RNN layer which output a vector of HIDDEN_SIZE (default 128). Verify this by examining s2s_model.summary().
Therefore, to change the addition example into an auto-encoder, place a dense layer right after the first RNN layer. This will take the 128 (default) dimensional vector down to whatever you wish (try 10).
Since you've added an extra twist in the encoder, you'll have to undo-it to use the existing decoder. This means you need to add another dense layer to take the dimension back up to 128. Finally, you'll need the length of the output sequence to be the same as the input sequence. This is specified in the RepeatVector layer. Here's how it all looks:
In [26]:
from keras import layers
from keras.models import Sequential
In [27]:
RNN = layers.LSTM
HIDDEN_SIZE = 128
BATCH_SIZE = 128
LAYERS = 1
ENCODED_VECTOR_DIM = 10
DROPOUT = 0.1
In [28]:
s2s_model = Sequential()
## ENCODER STAGER
# "Encode" the input sequence using an RNN, producing an output of HIDDEN_SIZE.
# Note: In a situation where your input sequences have a variable length,
# use input_shape=(None, num_feature).
s2s_model.add(RNN(HIDDEN_SIZE, input_shape=(MAXLEN, len(chars)), name="rnn_encoder"))
# add a dense layer to the encoding
s2s_model.add(layers.Dense(ENCODED_VECTOR_DIM, name="encoded_dense"))
# upsample for the decoder
s2s_model.add(layers.Dense(HIDDEN_SIZE, name="upsample"))
# adding a dropout layer really helps both training speed and the ultimate accuracy of the model
s2s_model.add(layers.Dropout(DROPOUT))
## DECODER STAGE
# As the decoder RNN's input, repeatedly provide with the last hidden state of
# RNN for each time step.
s2s_model.add(layers.RepeatVector(MAXLEN))
# The decoder RNN could be multiple layers stacked or a single layer.
for _ in range(LAYERS):
# By setting return_sequences to True, return not only the last output but
# all the outputs so far in the form of (num_samples, timesteps,
# output_dim). This is necessary as TimeDistributed in the below expects
# the first dimension to be the timesteps.
s2s_model.add(RNN(HIDDEN_SIZE, return_sequences=True))
# Apply a dense layer to the every temporal slice of an input. For each of step
# of the output sequence, decide which character should be chosen.
s2s_model.add(layers.TimeDistributed(layers.Dense(len(chars))))
s2s_model.add(layers.Activation('softmax'))
s2s_model.compile(loss='categorical_crossentropy',
optimizer='adam',
metrics=['accuracy'])
s2s_model.summary()
To train the model, pass it the input again as the target variable:
In [30]:
history = s2s_model.fit(x, x, epochs=2, batch_size=128, shuffle=True, validation_split=0.05)
How do you extract the encoded dense-vectors? Keras can make a new model for you, starting with your current input, and ending with the encoded vectors:
In [34]:
from keras.models import Model
In [35]:
encoder_model = Model(inputs=s2s_model.get_layer('rnn_encoder').input, outputs=s2s_model.get_layer('encoded_dense').output)
To use the encoder, simply pass our input data to its predict method:
In [36]:
encoded = encoder_model.predict(x)
In [37]:
encoded[0]
Out[37]:
Congratulations! You've turned an addition problem into a 10-d vector.
Exercise: run a dimensionality reduction technique, such as PCA or t-SNE, on the collection of 10-d vectors, to reduce them to 2-d vectors. Plot them. Did you achieve a good clustering of addtition problems?
Exercise: extract the decoder. Since you are not starting with an existing input, Keras requires you to work a little harder. Using the functional API, set up an input for the 10-d encoded vectors, and add the decoder layers one-by-one.
In [ ]:
Install recurrentshop by cloning the repo then running the install script (python setup.py install).
Install seq2seq with pip install git+https://github.com/farizrahman4u/seq2seq.git
In [ ]:
from seq2seq.models import SimpleSeq2Seq
s2s_model = BasicSeq2Seq(input_dim=SEQLEN, hidden_dim=10, output_length=1, output_dim=nb_chars)
s2s_model.compile(loss='categorical_crossentropy', optimizer='adam')
In [ ]:
BasicSeq2Seq?
In [ ]: