This tutorial shows how to train a character-level language model with a multilayer recurrent neural network. In particular, we will train a multilayer LSTM network that is able to generate President Obama's speeches.
We first download the dataset and show the first few characters.
In [1]:
import os
import urllib
import zipfile
if not os.path.exists("char_lstm.zip"):
urllib.urlretrieve("http://data.mxnet.io/data/char_lstm.zip", "char_lstm.zip")
with zipfile.ZipFile("char_lstm.zip","r") as f:
f.extractall("./")
with open('obama.txt', 'r') as f:
print f.read()[0:1000]
Then we define a few utility functions to pre-process the dataset.
In [2]:
def read_content(path):
with open(path) as ins:
return ins.read()
# Return a dict which maps each char into an unique int id
def build_vocab(path):
content = list(read_content(path))
idx = 1 # 0 is left for zero-padding
the_vocab = {}
for word in content:
if len(word) == 0:
continue
if not word in the_vocab:
the_vocab[word] = idx
idx += 1
return the_vocab
# Encode a sentence with int ids
def text2id(sentence, the_vocab):
words = list(sentence)
return [the_vocab[w] for w in words if len(w) > 0]
# build char vocabluary from input
vocab = build_vocab("./obama.txt")
print('vocab size = %d' %(len(vocab)))
Now we create the a multi-layer LSTM model. The definition of LSTM cell is implemented in lstm.py.
In [3]:
import lstm
# Each line contains at most 129 chars.
seq_len = 129
# embedding dimension, which maps a character to a 256-dimension vector
num_embed = 256
# number of lstm layers
num_lstm_layer = 3
# hidden unit in LSTM cell
num_hidden = 512
symbol = lstm.lstm_unroll(
num_lstm_layer,
seq_len,
len(vocab) + 1,
num_hidden=num_hidden,
num_embed=num_embed,
num_label=len(vocab) + 1,
dropout=0.2)
"""test_seq_len"""
data_file = open("./obama.txt")
for line in data_file:
assert len(line) <= seq_len + 1, "seq_len is smaller than maximum line length. Current line length is %d. Line content is: %s" % (len(line), line)
data_file.close()
In [4]:
import bucket_io
# The batch size for training
batch_size = 32
# initalize states for LSTM
init_c = [('l%d_init_c'%l, (batch_size, num_hidden)) for l in range(num_lstm_layer)]
init_h = [('l%d_init_h'%l, (batch_size, num_hidden)) for l in range(num_lstm_layer)]
init_states = init_c + init_h
# Even though BucketSentenceIter supports various length examples,
# we simply use the fixed length version here
data_train = bucket_io.BucketSentenceIter(
"./obama.txt",
vocab,
[seq_len],
batch_size,
init_states,
seperate_char='\n',
text2id=text2id,
read_content=read_content)
Then we can train with the standard model.fit
approach.
In [5]:
# @@@ AUTOTEST_OUTPUT_IGNORED_CELL
import mxnet as mx
import numpy as np
import logging
logging.getLogger().setLevel(logging.DEBUG)
# We will show a quick demo with only 1 epoch. In practice, we can set it to be 100
num_epoch = 1
# learning rate
learning_rate = 0.01
# Evaluation metric
def Perplexity(label, pred):
loss = 0.
for i in range(pred.shape[0]):
loss += -np.log(max(1e-10, pred[i][int(label[i])]))
return np.exp(loss / label.size)
model = mx.mod.Module(symbol=symbol,
data_names=[x[0] for x in data_train.provide_data],
label_names=[y[0] for y in data_train.provide_label],
context=[mx.gpu(0)])
model.fit(train_data=data_train,
num_epoch=num_epoch,
optimizer='sgd',
optimizer_params={'learning_rate':learning_rate, 'momentum':0, 'wd':0.0001},
initializer=mx.init.Xavier(factor_type="in", magnitude=2.34),
eval_metric=mx.metric.np(Perplexity),
batch_end_callback=mx.callback.Speedometer(batch_size, 20),
epoch_end_callback=mx.callback.do_checkpoint("obama"))
In [6]:
from rnn_model import LSTMInferenceModel
# helper strcuture for prediction
def MakeRevertVocab(vocab):
dic = {}
for k, v in vocab.items():
dic[v] = k
return dic
# make input from char
def MakeInput(char, vocab, arr):
idx = vocab[char]
tmp = np.zeros((1,))
tmp[0] = idx
arr[:] = tmp
# helper function for random sample
def _cdf(weights):
total = sum(weights)
result = []
cumsum = 0
for w in weights:
cumsum += w
result.append(cumsum / total)
return result
def _choice(population, weights):
assert len(population) == len(weights)
cdf_vals = _cdf(weights)
x = random.random()
idx = bisect.bisect(cdf_vals, x)
return population[idx]
# we can use random output or fixed output by choosing largest probability
def MakeOutput(prob, vocab, sample=False, temperature=1.):
if sample == False:
idx = np.argmax(prob, axis=1)[0]
else:
fix_dict = [""] + [vocab[i] for i in range(1, len(vocab) + 1)]
scale_prob = np.clip(prob, 1e-6, 1 - 1e-6)
rescale = np.exp(np.log(scale_prob) / temperature)
rescale[:] /= rescale.sum()
return _choice(fix_dict, rescale[0, :])
try:
char = vocab[idx]
except:
char = ''
return char
Then we create the inference model:
In [7]:
import rnn_model
# load from check-point
_, arg_params, __ = mx.model.load_checkpoint("obama", 75)
# build an inference model
model = rnn_model.LSTMInferenceModel(
num_lstm_layer,
len(vocab) + 1,
num_hidden=num_hidden,
num_embed=num_embed,
num_label=len(vocab) + 1,
arg_params=arg_params,
ctx=mx.gpu(),
dropout=0.2)
Now we can generate a sequence of 600 characters starting with "The United States"
In [8]:
seq_length = 600
input_ndarray = mx.nd.zeros((1,))
revert_vocab = MakeRevertVocab(vocab)
# Feel free to change the starter sentence
output ='The United States'
random_sample = False
new_sentence = True
ignore_length = len(output)
for i in range(seq_length):
if i <= ignore_length - 1:
MakeInput(output[i], vocab, input_ndarray)
else:
MakeInput(output[-1], vocab, input_ndarray)
prob = model.forward(input_ndarray, new_sentence)
new_sentence = False
next_char = MakeOutput(prob, revert_vocab, random_sample)
if next_char == '':
new_sentence = True
if i >= ignore_length - 1:
output += next_char
print(output)