This tutorial shows how to train a character-level language model with a multilayer recurrent neural network. In particular, we will train a multilayer LSTM network that is able to generate President Obama's speeches.
We first download the dataset and show the first few characters.
In [1]:
    
import os
import urllib
import zipfile
if not os.path.exists("char_lstm.zip"):
    urllib.urlretrieve("http://data.mxnet.io/data/char_lstm.zip", "char_lstm.zip")
with zipfile.ZipFile("char_lstm.zip","r") as f:
    f.extractall("./")     
with open('obama.txt', 'r') as f:
    print f.read()[0:1000]
    
    
Then we define a few utility functions to pre-process the dataset.
In [2]:
    
def read_content(path):
    with open(path) as ins:        
        return ins.read()
    
# Return a dict which maps each char into an unique int id
def build_vocab(path):
    content = list(read_content(path))
    idx = 1 # 0 is left for zero-padding
    the_vocab = {}
    for word in content:
        if len(word) == 0:
            continue
        if not word in the_vocab:
            the_vocab[word] = idx
            idx += 1
    return the_vocab
# Encode a sentence with int ids
def text2id(sentence, the_vocab):
    words = list(sentence)
    return [the_vocab[w] for w in words if len(w) > 0]
            
# build char vocabluary from input
vocab = build_vocab("./obama.txt")
print('vocab size = %d' %(len(vocab)))
    
    
Now we create the a multi-layer LSTM model. The definition of LSTM cell is implemented in lstm.py.
In [3]:
    
import lstm
# Each line contains at most 129 chars. 
seq_len = 129
# embedding dimension, which maps a character to a 256-dimension vector
num_embed = 256
# number of lstm layers
num_lstm_layer = 3
# hidden unit in LSTM cell
num_hidden = 512
symbol = lstm.lstm_unroll(
    num_lstm_layer, 
    seq_len,
    len(vocab) + 1,
    num_hidden=num_hidden,
    num_embed=num_embed,
    num_label=len(vocab) + 1, 
    dropout=0.2)
"""test_seq_len"""
data_file = open("./obama.txt")
for line in data_file:
    assert len(line) <= seq_len + 1, "seq_len is smaller than maximum line length. Current line length is %d. Line content is: %s" % (len(line), line)
data_file.close()
    
In [4]:
    
import bucket_io
# The batch size for training
batch_size = 32
# initalize states for LSTM
init_c = [('l%d_init_c'%l, (batch_size, num_hidden)) for l in range(num_lstm_layer)]
init_h = [('l%d_init_h'%l, (batch_size, num_hidden)) for l in range(num_lstm_layer)]
init_states = init_c + init_h
# Even though BucketSentenceIter supports various length examples,
# we simply use the fixed length version here
data_train = bucket_io.BucketSentenceIter(
    "./obama.txt", 
    vocab, 
    [seq_len], 
    batch_size,             
    init_states, 
    seperate_char='\n',
    text2id=text2id, 
    read_content=read_content)
    
    
Then we can train with the standard model.fit approach.
In [5]:
    
# @@@ AUTOTEST_OUTPUT_IGNORED_CELL
import mxnet as mx
import numpy as np
import logging
logging.getLogger().setLevel(logging.DEBUG)
# We will show a quick demo with only 1 epoch. In practice, we can set it to be 100
num_epoch = 1
# learning rate 
learning_rate = 0.01
# Evaluation metric
def Perplexity(label, pred):
    loss = 0.
    for i in range(pred.shape[0]):
        loss += -np.log(max(1e-10, pred[i][int(label[i])]))
    return np.exp(loss / label.size)
model = mx.mod.Module(symbol=symbol,
                      data_names=[x[0] for x in data_train.provide_data],
                      label_names=[y[0] for y in data_train.provide_label],
                      context=[mx.gpu(0)])
model.fit(train_data=data_train,
          num_epoch=num_epoch,
          optimizer='sgd',
          optimizer_params={'learning_rate':learning_rate, 'momentum':0, 'wd':0.0001},
          initializer=mx.init.Xavier(factor_type="in", magnitude=2.34),
          eval_metric=mx.metric.np(Perplexity),
          batch_end_callback=mx.callback.Speedometer(batch_size, 20),
          epoch_end_callback=mx.callback.do_checkpoint("obama"))
    
    
In [6]:
    
from rnn_model import LSTMInferenceModel
# helper strcuture for prediction
def MakeRevertVocab(vocab):
    dic = {}
    for k, v in vocab.items():
        dic[v] = k
    return dic
# make input from char
def MakeInput(char, vocab, arr):
    idx = vocab[char]
    tmp = np.zeros((1,))
    tmp[0] = idx
    arr[:] = tmp
# helper function for random sample 
def _cdf(weights):
    total = sum(weights)
    result = []
    cumsum = 0
    for w in weights:
        cumsum += w
        result.append(cumsum / total)
    return result
def _choice(population, weights):
    assert len(population) == len(weights)
    cdf_vals = _cdf(weights)
    x = random.random()
    idx = bisect.bisect(cdf_vals, x)
    return population[idx]
# we can use random output or fixed output by choosing largest probability
def MakeOutput(prob, vocab, sample=False, temperature=1.):
    if sample == False:
        idx = np.argmax(prob, axis=1)[0]
    else:
        fix_dict = [""] + [vocab[i] for i in range(1, len(vocab) + 1)]
        scale_prob = np.clip(prob, 1e-6, 1 - 1e-6)
        rescale = np.exp(np.log(scale_prob) / temperature)
        rescale[:] /= rescale.sum()
        return _choice(fix_dict, rescale[0, :])
    try:
        char = vocab[idx]
    except:
        char = ''
    return char
    
Then we create the inference model:
In [7]:
    
import rnn_model 
# load from check-point
_, arg_params, __ = mx.model.load_checkpoint("obama", 75)
# build an inference model
model = rnn_model.LSTMInferenceModel(
    num_lstm_layer,
    len(vocab) + 1,
    num_hidden=num_hidden,
    num_embed=num_embed,
    num_label=len(vocab) + 1, 
    arg_params=arg_params, 
    ctx=mx.gpu(), 
    dropout=0.2)
    
Now we can generate a sequence of 600 characters starting with "The United States"
In [8]:
    
seq_length = 600
input_ndarray = mx.nd.zeros((1,))
revert_vocab = MakeRevertVocab(vocab)
# Feel free to change the starter sentence
output ='The United States'
random_sample = False
new_sentence = True
ignore_length = len(output)
for i in range(seq_length):
    if i <= ignore_length - 1:
        MakeInput(output[i], vocab, input_ndarray)
    else:
        MakeInput(output[-1], vocab, input_ndarray)
    prob = model.forward(input_ndarray, new_sentence)
    new_sentence = False
    next_char = MakeOutput(prob, revert_vocab, random_sample)
    if next_char == '':
        new_sentence = True
    if i >= ignore_length - 1:
        output += next_char
print(output)